libevent源码学习-----时间管理
libevent監聽的event有以下幾種
- 文件描述符/套接字,沒有設定超時時長
- 信號
- 文件描述符/套接字,設定超時時長
對于時間,libevent內部的時間管理是通過最小堆實現的,原因如下
- 既然某些fd有規定的超時時長,那么io多路復用函數就不能永久阻塞,需要設定一個超時時長(最后一個參數)
- 用戶在使用event_add設定的時間是相對于event_add調用的相對時間,這就導致所有具有超時時長的event什么時候超時是雜亂無章的,沒辦法為io函數設定某個時長
針對以上原因,libevent內部將所有的event超時時長全部轉化為絕對時間,有以下幾點好處
- 可以對所有的超時時間進行排序,獲得最早超時的那個event
- 判斷是否超時時只需和現有時間比較大小,不需要每次都進行相對時間的判斷
- 可以采用最小堆存儲所有具有超時時間的event,如果堆頂event未超時,那么所有的event都不會超時,可以選擇這個event的超時時長最為io函數的阻塞時間
使用絕對時間帶來的麻煩是如果event是一個永久事件,那么當event被激活后仍然需要重新注冊到base中,此時因為event的時間是絕對時間的緣故,不能夠直接調用event_add_internal添加event,而是需要重新計算超時時間再添加,這就導致仍然需要在event中存儲用戶提供的超時時長,在重新添加之前計算絕對時間的超時時間
/** event_add調用的內部函數,用于將event添加到base的注冊隊列中* 同時添加到相應的map中* * 注意:這個函數不僅僅只由event_add調用,還有event_persist_closure調用* 由這個函數調用是因為當具有超時時間的event被激活后,需要先從base中的所有隊列中刪除* 然后重新計算超時時間,再重新添加到base中,所以又重新調用了這個函數* * 注意:event不僅代表文件描述符,還有可能是信號的event,當是信號時,會遞歸* 調用兩遍這個函數,第一遍調用時判斷是信號則調用evsig_map_add函數,在這個函數中* 進行兩步* 將信號event添加到base的信號map中* 調用evsigops的add函數,即調用evsig_add,這個函數中綁定內部信號處理函數,同時將socketpair的event* 添加到base中,使用event_add,也就是調用event_add_internal* 不過只會執行兩遍,因為在evsig_add中會進行判斷,只有第一次添加socketpair的event時才會執行第二次調用* * 見evsig_add*/ static inline int event_add_internal(struct event *ev, const struct timeval *tv,int tv_is_absolute) {struct event_base *base = ev->ev_base;int res = 0;int notify = 0;EVENT_BASE_ASSERT_LOCKED(base);/* ... *//** prepare for timeout insertion further below, if we get a* failure on any step, we should not change any state.*//** 這一步主要是用來讓最小堆增加一個位置,并沒有實際添加到最小堆上* 判斷條件是這是一個具有超時時間的event,同時在最小堆中沒有這個event* 這樣就需要在最小堆上留出一個位置來存放這個event* 因為用戶可以對同一個event調用event_add多次,這就可能兩次event_add除了超時時間不同* 其他的都相同,這樣就不需要在留出一個位置,直接替換以前的就可以* * 如果已經在最小堆中,ev_flags將是EVLIST_TIMEOUT*/if (tv != NULL && !(ev->ev_flags & EVLIST_TIMEOUT)) {if (min_heap_reserve(&base->timeheap,1 + min_heap_size(&base->timeheap)) == -1) return (-1); /* ENOMEM == errno */}/** we should change the timeout state only if the previous event* addition succeeded.*//* 這一步開始處理具有超時時間的event */if (res != -1 && tv != NULL) {struct timeval now;int common_timeout;/** for persistent timeout events, we remember the* timeout value and re-add the event.** If tv_is_absolute, this was already set.*//** event分為永久的和一次的,是用戶在調用event_new時傳入的參數* 對于永久的event,在被激活一次之后還需要繼續監聽,* 而對于有超時時間的event,需要對event的超時時間進行更新* * 為什么:因為base在進行超時判斷時是通過絕對時間進行判斷的,也就是說在添加event的時候* 將當前時間+時間間隔獲得的絕對時間作為判斷超時的依據* 這樣做的原因是不需要在判斷超時時比較時間差,只需要比較當前時間和超時時間即可* * 所以,如果event是永久的,那么再處理過一次之后需要更新超時絕對時間,方法就是保存用戶* 傳入的時間間隔,再下一次添加時使用** tv_is_absolute是傳入的參數,event_add傳入時設為0,表示傳入的時間是時間間隔,不是絕對時間*/if (ev->ev_closure == EV_CLOSURE_PERSIST && !tv_is_absolute)ev->ev_io_timeout = *tv;/** we already reserved memory above for the case where we* are not replacing an existing timeout.*//** 對于用戶對同一個event調用event_add多次的情況,先將以前的從最小堆* 中刪除,再添加更新的這個*/if (ev->ev_flags & EVLIST_TIMEOUT) {/* XXX I believe this is needless. */if (min_heap_elt_is_top(ev))notify = 1;event_queue_remove(base, ev, EVLIST_TIMEOUT);}/* Check if it is active due to a timeout. Rescheduling* this timeout before the callback can be executed* removes it from the active list. *//* * 如果此時event正處于激活隊列中,從激活隊列刪了 * 如果是信號,將其發生次數設為0,就不會調用信號處理函數了*/if ((ev->ev_flags & EVLIST_ACTIVE) &&(ev->ev_res & EV_TIMEOUT)) {if (ev->ev_events & EV_SIGNAL) {/* See if we are just active executing* this event in a loop*/if (ev->ev_ncalls && ev->ev_pncalls) {/* Abort loop */*ev->ev_pncalls = 0;}}event_queue_remove(base, ev, EVLIST_ACTIVE);}/* 計算超時絕對事件 */gettime(base, &now);common_timeout = is_common_timeout(tv, base);if (tv_is_absolute) {ev->ev_timeout = *tv;} else if (common_timeout) {struct timeval tmp = *tv;tmp.tv_usec &= MICROSECONDS_MASK;evutil_timeradd(&now, &tmp, &ev->ev_timeout);ev->ev_timeout.tv_usec |=(tv->tv_usec & ~MICROSECONDS_MASK);} else {evutil_timeradd(&now, tv, &ev->ev_timeout);}event_debug(("event_add: timeout in %d seconds, call %p",(int)tv->tv_sec, ev->ev_callback));/* 調用event_queue_insert()將具有超時時間的event添加到base最小堆中 */event_queue_insert(base, ev, EVLIST_TIMEOUT);/* See if the earliest timeout is now earlier than it* was before: if so, we will need to tell the main* thread to wake up earlier than it would* otherwise. */if (min_heap_elt_is_top(ev))notify = 1;} return (res); }這個函數被event_add調用,用于添加所有的event到base中,很明顯函數內部是調用event_queue_insert添加event的,event_add_internal主要用于分配event,判斷它應該添加到base的哪個地方。
下面是event_queue_insert添加到最小堆的部分
上述這些都是為了能夠處理超時event做鋪墊,現在轉到event_base_loop中
/** 實際的事件驅動循環,其實就是一個while循環,每次調用io復用函數進行事件監聽* 監聽返回之前將活躍的event都按優先級添加到base的激活隊列中* 回到循環后對base的激活隊列中的event按照優先級順序調用回調函數* 再根據是否是永久event決定要不要從base的所有隊列中刪除event* 對于具有超時時間的event則需要特殊處理,見timeout_process*/ int event_base_loop(struct event_base *base, int flags) {const struct eventop *evsel = base->evsel;struct timeval tv;struct timeval *tv_p;int res, done, retval = 0;done = 0;while (!done) {tv_p = &tv;/* * 這一步是用來獲取io復用函數的阻塞時間,此時有兩種情況* 當此時沒有處于激活狀態的event,就從最小堆中取得堆頂event的超時時間* 如果有已經激活的event,則阻塞時間為0,直接處理,原因可能是因為其他線程激活了某個時間造成的* timeout_next函數取得最小堆堆頂元素的超時時間,并與當前時間做差計算時間間隔* evutil_timeclear直接清零時間* 這么取阻塞時間的原因見下面*/if (!N_ACTIVE_CALLBACKS(base) && !(flags & EVLOOP_NONBLOCK)) {timeout_next(base, &tv_p);} else {/** if we have active events, we just poll new events* without waiting.*/evutil_timerclear(&tv);}/** 調用Io復用函數的監聽函數,開始阻塞/非阻塞的監聽* 超時時間設置為最小堆中堆頂event的超時時間,原因如下* * 此時監聽的有三種event* 第一種是沒有設置超時時間的,包括信號,所以什么時候返回都不影響* 第二種是取得最小超時時間的堆頂event,此時可以滿足在超時時間返回* 第三種是最小堆中的其他event,這些event的超時時間在堆頂event之后,因為超時時間是絕對時間* 也就是說如果堆頂event沒有超時,那么其它的event將不可能超時* 而當最小超時時間后返回處理超時之后重新開始監聽,* 因為是絕對時間,所以不會影響最小堆的其他event的超時** 在返回之間,將活躍的event添加到base的激活隊列中* * 注意:不處理具有超時時間的event,因為這些event根本就沒有添加到io函數中* 處理這些是在timeout_process函數中*/res = evsel->dispatch(base, tv_p);/** 專門處理超時的event* 注意為什么可以單獨處理超時event,因為具有超時時間的event都被添加到最小堆里面了* 這樣,只需要遍歷最小堆,用堆元素的超時時間和當前時間比較,就可以判斷是否超時* 其實不需要全部遍歷,而是當遇到第一個沒有超時的event就可以退出遍歷* 因為最小堆的當前節點永遠比兩個子節點小,所以子節點的超時時間會更長,不可能超時* * 需要遍歷而不是只取堆頂的原因是在從io函數返回到執行timeout_process的過程中* 其他線程可能又向最小堆中添加了超時時間更小的event,這就導致了事先使用的* 時間的那個event已經不是堆頂元素了*/timeout_process(base);}return (retval); }下面是timeout_process函數
/* * 將最小堆中超時的event添加到激活隊列中* 此函數由event_base_loop調用*/ static void timeout_process(struct event_base *base) {/* Caller must hold lock. */struct timeval now;struct event *ev;if (min_heap_empty(&base->timeheap)) {return;}//獲取當前時間,用于判斷是否超時gettime(base, &now);/* * 需要循環判斷而不是只取堆頂的原因* 因為其它線程有可能添加了更小的超時event* 見event_base_loop中timeout_process的調用*/while ((ev = min_heap_top(&base->timeheap))) {//當遇到第一個沒有超時的event就可以退出了,原因見base的主循環if (evutil_timercmp(&ev->ev_timeout, &now, >))break;/* delete this event from the I/O queues *//* * 取出一個就將這個event從所有隊列中刪除* 然后再添加到激活隊列中* 原因:因為要重新計算超時時間,需要從所有隊列中刪除*/event_del_internal(ev);event_debug(("timeout_process: call %p",ev->ev_callback));/** 將event添加到超時隊列中* 此處需要設置event被激活的原因,EV_TIMEOUT表示由于超時被激活*/event_active_nolock(ev, EV_TIMEOUT, 1);} }總結
至此就完成了對具有超時時長的event的監控和處理,libevent的時間管理可以學習的地方在于將相對時間轉換成絕對時間,不要一根勁,以為用戶提供相對時間程序設計的時候就必須使用相對時間,反而可以變相思考,進行適當轉換,便于程序設計。libevent在轉換后既解決了io復用函數阻塞時間問題,又提高了時間管理的效率(使用最小堆)
總結
以上是生活随笔為你收集整理的libevent源码学习-----时间管理的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: libevent源码学习----io多路
- 下一篇: libevent源码学习-----eve