處理大并發之四?libevent?demo詳細分析(對比epoll)
libevent默認情況下是單線程,每個線程有且僅有一個event_base,對應一個struct?event_base結構體,以及賦予其上的事件管理器,用來安排托管給它的一系列的事件。
當有一個事件發生的時候,event_base會在合適的時間去調用綁定在這個事件上的函數,直到這個函數執行完成,然后在返回安排其他事件。需要注意的是:合適的時間并不是立即。
例如:
[cpp] view plaincopy print?
struct?event_base?*base;??base?=?event_base_new();??
struct event_base *base;
base = event_base_new();//初始化libevent
event_base_new對比epoll,可以理解為epoll里的epoll_create。
event_base內部有一個循環,循環阻塞在epoll調用上,當有一個事件發生的時候,才會去處理這個事件。其中,這個事件是被綁定在event_base上面的,每一個事件就會對應一個struct?event,可以是監聽的fd。?
其中struct?event?使用event_new?來創建和綁定,使用event_add來啟用,例如:
[cpp] view plaincopy print?
struct?event?*listener_event;??listener_event?=?event_new(base,?listener,?EV_READ|EV_PERSIST,?do_accept,?(void*)base);??
struct event *listener_event;
listener_event = event_new(base, listener, EV_READ|EV_PERSIST, do_accept, (void*)base);
參數說明:
base:event_base類型,event_base_new的返回值
listener:監聽的fd,listen的fd
EV_READ|EV_PERSIST:事件的類型及屬性
do_accept:綁定的回調函數
(void*)base:給回調函數的參數
event_add(listener_event,?NULL);
對比epoll:
event_new相當于epoll中的epoll_wait,其中的epoll里的while循環,在libevent里使用event_base_dispatch。
event_add相當于epoll中的epoll_ctl,參數是EPOLL_CTL_ADD,添加事件。
注:libevent支持的事件及屬性包括(使用bitfield實現,所以要用?|?來讓它們合體)
EV_TIMEOUT:?超時
EV_READ:?只要網絡緩沖中還有數據,回調函數就會被觸發
EV_WRITE:?只要塞給網絡緩沖的數據被寫完,回調函數就會被觸發
EV_SIGNAL:?POSIX信號量
EV_PERSIST:?不指定這個屬性的話,回調函數被觸發后事件會被刪除
EV_ET:?Edge-Trigger邊緣觸發,相當于EPOLL的ET模式
事件創建添加之后,就可以處理發生的事件了,相當于epoll里的epoll_wait,在libevent里使用event_base_dispatch啟動event_base循環,直到不再有需要關注的事件。
有了上面的分析,結合之前做的epoll服務端程序,對于一個服務器程序,流程基本是這樣的:
1.?創建socket,bind,listen,設置為非阻塞模式
2.?創建一個event_base,即
[cpp] view plaincopy print?
struct?event_base?*??event_base_new(void)??
struct event_base * event_base_new(void)
3.?創建一個event,將該socket托管給event_base,指定要監聽的事件類型,并綁定上相應的回調函數(及需要給它的參數)。即
[cpp] view plaincopy print?
struct?event?*??event_new(struct?event_base?*base,?evutil_socket_t?fd,?short?events,?void?(*cb)(evutil_socket_t,?short,?void?*),?void?*arg)??
struct event * event_new(struct event_base *base, evutil_socket_t fd, short events, void (*cb)(evutil_socket_t, short, void *), void *arg)
4.?啟用該事件,即
[cpp] view plaincopy print?
int??event_add(struct?event?*ev,?const?struct?timeval?*tv)??
int event_add(struct event *ev, const struct timeval *tv)
5.??進入事件循環,即
[cpp] view plaincopy print?
int??event_base_dispatch(struct?event_base?*event_base)??
int event_base_dispatch(struct event_base *event_base)
?
有了這些知識儲備,來看下官網上的demo,網址:http://www.wangafu.net/~nickm/libevent-book/01_intro.html,這里引用的例子是Example:?A?low-level?ROT13?server?with?Libevent
首先來翻譯下例子上面的一段話:
對于select函數來說,不同的操作系統有不同的代替函數,它包括:poll,epoll,kqueue,evport和/dev/poll。這些函數的性能都比select要好,其中epoll在IO中添加,刪除,通知socket準備好方面性能復雜度為O(1)。
不幸的是,沒有一個有效的接口是一個普遍存在的標準,Linux下有epoll,BSDS有kqueue,Solaris?有evport和/dev/poll,等等。沒有任何一個操作系統有它們中所有的,所以如果你想做一個輕便的高性能的異步應用程序,你就需要把這些接口抽象的封裝起來,并且無論哪一個系統使用它都是最高效的。
這對于你來說就是最低級的libevent?API,它提供了統一的接口取代了select,當它在計算機上運行的時候,使用了最有效的版本。
這里是ROT13服務器的另外一個版本,這次,他使用了libevent代替了select。這意味著我們不再使用fd_sets,取而代之的使用event_base添加和刪除事件,它可能在select,poll,epoll,kqueue等中執行。
代碼分析:
這是一個服務端的程序,可以處理客戶端大并發的連接,當收到客戶端的連接后,將收到的數據做了一個變換,如果是?’a’-‘m’之間的字符,將其增加13,如果是?’n’-‘z’之間的字符,將其減少13,其他字符不變,然后將轉換后的數據發送給客戶端。
例如:客戶端發送:Client?0?send??Message!
服務端會回復:Pyvrag?0?fraq??Zrffntr!
在這個代碼中沒有使用bufferevent這個強大的東西,在一個結構體中自己管理了一個緩沖區。結構體為:
[cpp] view plaincopy print?
struct?fd_state?{??????char?buffer[MAX_LINE];??????size_t?buffer_used;????????size_t?n_written;??????size_t?write_upto;????????struct?event?*read_event;??????struct?event?*write_event;??};??
struct fd_state {char buffer[MAX_LINE];//緩沖區的大小size_t buffer_used;//接收到已經使用的buffer大小,每次將接收到的數據字節數相加,當發送的字節數累計相加和buffer_used都相等時候,將它們都置為1size_t n_written;//發送的累加字節數size_t write_upto;//相當于一個臨時變量,當遇到換行符的時,將其收到的字節數(換行符除外)賦給該值,當檢測到寫事件的時候,用已經發送的字節數和該數值做比較,若收到的字節總數小于該值,則發送數據,等于該值,將結構體中3個字節數統計變量都置為1,為什么會置為1呢,因為有一個換行符吧。struct event *read_event;struct event *write_event;
};
代碼中自己管理了一個緩沖區,用于存放接收到的數據,發送的數據將其轉換后也放入該緩沖區中,代碼晦澀難懂,我也是經過打日志分析后,才明白點,這個緩沖區自己還得控制好。但是libevent?2已經提供了一個神器bufferevent,我們在使用的過程中最好不要自己管理這個緩沖區,之所以分析這個代碼,是為了熟悉libevent?做服務端程序的流程及原理。
下面是代碼,加有部分注釋和日志:
代碼:lowlevel_libevent_server.c?
[cpp] view plaincopy print?
??????#include?<netinet/in.h>????#include?<sys/socket.h>????#include?<fcntl.h>????#include?<event2/event.h>????#include?<assert.h>??#include?<unistd.h>??#include?<string.h>??#include?<stdlib.h>??#include?<stdio.h>??#include?<errno.h>????#define?MAX_LINE?80????void?do_read(evutil_socket_t?fd,?short?events,?void?*arg);??void?do_write(evutil_socket_t?fd,?short?events,?void?*arg);????char?rot13_char(char?c)??{?????????????if?((c?>=?'a'?&&?c?<=?'m')?||?(c?>=?'A'?&&?c?<=?'M'))??????????return?c?+?13;??????else?if?((c?>=?'n'?&&?c?<=?'z')?||?(c?>=?'N'?&&?c?<=?'Z'))??????????return?c?-?13;??????else??????????return?c;??}????struct?fd_state?{??????char?buffer[MAX_LINE];??????size_t?buffer_used;????????size_t?n_written;??????size_t?write_upto;????????struct?event?*read_event;??????struct?event?*write_event;??};????struct?fd_state?*?alloc_fd_state(struct?event_base?*base,?evutil_socket_t?fd)??{??????struct?fd_state?*state?=?malloc(sizeof(struct?fd_state));??????if?(!state)??????????return?NULL;????????state->read_event?=?event_new(base,?fd,?EV_READ|EV_PERSIST,?do_read,?state);??????if?(!state->read_event)??????{??????????free(state);??????????return?NULL;??????}????????state->write_event?=?event_new(base,?fd,?EV_WRITE|EV_PERSIST,?do_write,?state);??????if?(!state->write_event)??????{??????????event_free(state->read_event);??????????free(state);??????????return?NULL;??????}????????state->buffer_used?=?state->n_written?=?state->write_upto?=?0;????????assert(state->write_event);??????return?state;??}????void?free_fd_state(struct?fd_state?*state)??{??????event_free(state->read_event);??????event_free(state->write_event);??????free(state);??}????void?do_read(evutil_socket_t?fd,?short?events,?void?*arg)??{??????struct?fd_state?*state?=?arg;??????char?buf[20];??????int?i;??????ssize_t?result;??????printf("\ncome?in?do_read:?fd:?%d,?state->buffer_used:?%d,?sizeof(state->buffer):?%d\n",?fd,?state->buffer_used,?size??of(state->buffer));??????while?(1)??????{??????????assert(state->write_event);??????????result?=?recv(fd,?buf,?sizeof(buf),?0);??????????if?(result?<=?0)??????????????break;??????????printf("recv?once,?fd:?%d,?recv?size:?%d,?recv?buff:?%s\n",?fd,?result,?buf);????????????for?(i=0;?i?<?result;?++i)??????????{??????????????if?(state->buffer_used?<?sizeof(state->buffer))??????????????????state->buffer[state->buffer_used++]?=?rot13_char(buf[i]);????????????????if?(buf[i]?==?'\n')???????????????{??????????????????assert(state->write_event);??????????????????event_add(state->write_event,?NULL);??????????????????state->write_upto?=?state->buffer_used;??????????????????printf("遇到換行符,state->write_upto:?%d,?state->buffer_used:?%d\n",state->write_upto,?state->buffer_use??d);??????????????}??????????}??????????printf("recv?once,?state->buffer_used:?%d\n",?state->buffer_used);??}??????????????if?(result?==?0)??????{??????????free_fd_state(state);??????}??????else?if?(result?<?0)??????{??????????if?(errno?==?EAGAIN)???????????????return;??????????perror("recv");??????????free_fd_state(state);??????}??}????void?do_write(evutil_socket_t?fd,?short?events,?void?*arg)??{??????struct?fd_state?*state?=?arg;????????printf("\ncome?in?do_write,?fd:?%d,?state->n_written:?%d,?state->write_upto:?%d\n",fd,?state->n_written,?state->write??_upto);??????while?(state->n_written?<?state->write_upto)??????{??????????ssize_t?result?=?send(fd,?state->buffer?+?state->n_written,?state->write_upto?-?state->n_written,?0);??????????if?(result?<?0)?{??????????????if?(errno?==?EAGAIN)???????????????????return;??????????????free_fd_state(state);??????????????return;??????????}??????????assert(result?!=?0);????????????state->n_written?+=?result;??????????printf("send?fd:?%d,?send?size:?%d,?state->n_written:?%d\n",?fd,?result,?state->n_written);??????}????????if?(state->n_written?==?state->buffer_used)??????{??????????printf("state->n_written?==?state->buffer_used:?%d\n",?state->n_written);??????????state->n_written?=?state->write_upto?=?state->buffer_used?=?1;??????????printf("state->n_written?=?state->write_upto?=?state->buffer_used?=?1\n");??????}????????event_del(state->write_event);??}????void?do_accept(evutil_socket_t?listener,?short?event,?void?*arg)??{??????struct?event_base?*base?=?arg;??????struct?sockaddr_storage?ss;??????socklen_t?slen?=?sizeof(ss);??????int?fd?=?accept(listener,?(struct?sockaddr*)&ss,?&slen);??????if?(fd?<?0)??????{???????????perror("accept");??????}??????else?if?(fd?>?FD_SETSIZE)??????{??????????close(fd);???????}??????else??????{??????????struct?fd_state?*state;??????????evutil_make_socket_nonblocking(fd);??????????state?=?alloc_fd_state(base,?fd);??????????assert(state);???????????assert(state->write_event);??????????event_add(state->read_event,?NULL);??????}??}????void?run(void)??{??????evutil_socket_t?listener;??????struct?sockaddr_in?sin;??????struct?event_base?*base;??????struct?event?*listener_event;????????base?=?event_base_new();??????if?(!base)??????????return;?????????sin.sin_family?=?AF_INET;??????sin.sin_addr.s_addr?=?0;??????sin.sin_port?=?htons(8000);????????listener?=?socket(AF_INET,?SOCK_STREAM,?0);??????evutil_make_socket_nonblocking(listener);????#ifndef?WIN32??????{??????????int?one?=?1;??????????setsockopt(listener,?SOL_SOCKET,?SO_REUSEADDR,?&one,?sizeof(one));??????}??#endif????????if?(bind(listener,?(struct?sockaddr*)&sin,?sizeof(sin))?<?0)??????{??????????perror("bind");??????????return;?? }?? ?? ?? ????if?(listen(listener,?16)<0)?? ????{?? ????????perror("listen");?? ????????return;?? ????}?? ?? ????listener_event?=?event_new(base,?listener,?EV_READ|EV_PERSIST,?do_accept,?(void*)base);?? ?????? ????event_add(listener_event,?NULL);?? ?? ????event_base_dispatch(base);?? }?? ?? int?main(int?c,?char?**v)?? {?? ?? ?? ????run();?? ????return?0;?? }??
//說明,為了使我們的代碼兼容win32網絡API,我們使用evutil_socket_t代替int,使用evutil_make_socket_nonblocking代替fcntl/* For sockaddr_in */
#include <netinet/in.h>
/* For socket functions */
#include <sys/socket.h>
/* For fcntl */
#include <fcntl.h>#include <event2/event.h>#include <assert.h>
#include <unistd.h>
#include <string.h>
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>#define MAX_LINE 80void do_read(evutil_socket_t fd, short events, void *arg);
void do_write(evutil_socket_t fd, short events, void *arg);char rot13_char(char c)
{/* We don't want to use isalpha here; setting the locale would change* which characters are considered alphabetical. */if ((c >= 'a' && c <= 'm') || (c >= 'A' && c <= 'M'))return c + 13;else if ((c >= 'n' && c <= 'z') || (c >= 'N' && c <= 'Z'))return c - 13;elsereturn c;
}struct fd_state {char buffer[MAX_LINE];size_t buffer_used;size_t n_written;size_t write_upto;struct event *read_event;struct event *write_event;
};struct fd_state * alloc_fd_state(struct event_base *base, evutil_socket_t fd)
{struct fd_state *state = malloc(sizeof(struct fd_state));if (!state)return NULL;state->read_event = event_new(base, fd, EV_READ|EV_PERSIST, do_read, state);if (!state->read_event){free(state);return NULL;}state->write_event = event_new(base, fd, EV_WRITE|EV_PERSIST, do_write, state);if (!state->write_event){event_free(state->read_event);free(state);return NULL;}state->buffer_used = state->n_written = state->write_upto = 0;assert(state->write_event);return state;
}void free_fd_state(struct fd_state *state)
{event_free(state->read_event);event_free(state->write_event);free(state);
}void do_read(evutil_socket_t fd, short events, void *arg)
{struct fd_state *state = arg;char buf[20];int i;ssize_t result;printf("\ncome in do_read: fd: %d, state->buffer_used: %d, sizeof(state->buffer): %d\n", fd, state->buffer_used, size
of(state->buffer));while (1){assert(state->write_event);result = recv(fd, buf, sizeof(buf), 0);if (result <= 0)break;printf("recv once, fd: %d, recv size: %d, recv buff: %s\n", fd, result, buf);for (i=0; i < result; ++i){if (state->buffer_used < sizeof(state->buffer))//如果讀事件的緩沖區還未滿,將收到的數據做轉換state->buffer[state->buffer_used++] = rot13_char(buf[i]);
// state->buffer[state->buffer_used++] = buf[i];//接收什么發送什么,不經過轉換,測試用if (buf[i] == '\n') //如果遇到換行,添加寫事件,并設置寫事件的大小{assert(state->write_event);event_add(state->write_event, NULL);state->write_upto = state->buffer_used;printf("遇到換行符,state->write_upto: %d, state->buffer_used: %d\n",state->write_upto, state->buffer_use
d);}}printf("recv once, state->buffer_used: %d\n", state->buffer_used);
}//判斷最后一次接收的字節數if (result == 0){free_fd_state(state);}else if (result < 0){if (errno == EAGAIN) // XXXX use evutil macroreturn;perror("recv");free_fd_state(state);}
}void do_write(evutil_socket_t fd, short events, void *arg)
{struct fd_state *state = arg;printf("\ncome in do_write, fd: %d, state->n_written: %d, state->write_upto: %d\n",fd, state->n_written, state->write
_upto);while (state->n_written < state->write_upto){ssize_t result = send(fd, state->buffer + state->n_written, state->write_upto - state->n_written, 0);if (result < 0) {if (errno == EAGAIN) // XXX use evutil macroreturn;free_fd_state(state);return;}assert(result != 0);state->n_written += result;printf("send fd: %d, send size: %d, state->n_written: %d\n", fd, result, state->n_written);}if (state->n_written == state->buffer_used){printf("state->n_written == state->buffer_used: %d\n", state->n_written);state->n_written = state->write_upto = state->buffer_used = 1;printf("state->n_written = state->write_upto = state->buffer_used = 1\n");}event_del(state->write_event);
}void do_accept(evutil_socket_t listener, short event, void *arg)
{struct event_base *base = arg;struct sockaddr_storage ss;socklen_t slen = sizeof(ss);int fd = accept(listener, (struct sockaddr*)&ss, &slen);if (fd < 0){ // XXXX eagain??perror("accept");}else if (fd > FD_SETSIZE){close(fd); // XXX replace all closes with EVUTIL_CLOSESOCKET */}else{struct fd_state *state;evutil_make_socket_nonblocking(fd);state = alloc_fd_state(base, fd);assert(state); /*XXX err*/assert(state->write_event);event_add(state->read_event, NULL);}
}void run(void)
{evutil_socket_t listener;struct sockaddr_in sin;struct event_base *base;struct event *listener_event;base = event_base_new();//初始化libeventif (!base)return; /*XXXerr*/sin.sin_family = AF_INET;sin.sin_addr.s_addr = 0;//本機sin.sin_port = htons(8000);listener = socket(AF_INET, SOCK_STREAM, 0);evutil_make_socket_nonblocking(listener);#ifndef WIN32{int one = 1;setsockopt(listener, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));}
#endifif (bind(listener, (struct sockaddr*)&sin, sizeof(sin)) < 0){perror("bind");return;}if (listen(listener, 16)<0){perror("listen");return;}listener_event = event_new(base, listener, EV_READ|EV_PERSIST, do_accept, (void*)base);/*XXX check it */event_add(listener_event, NULL);event_base_dispatch(base);}int main(int c, char **v){// setvbuf(stdout, NULL, _IONBF, 0);run();return 0;}
編譯:gcc?-I/usr/include?-o?test?lowlevel_libevent_server.c?-L/usr/local/lib?-levent
運行結果:
新人創作打卡挑戰賽發博客就能抽獎!定制產品紅包拿不停!
總結
以上是生活随笔為你收集整理的处理大并发之四 libevent demo详细分析(对比epoll)的全部內容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。