高效并发处理之libevent
出處:http://www.oschina.net/question/12_15837
構建現代的服務器應用程序需要以某種方法同時接收數百、數千甚至數萬個事件,無論它們是內部請求還是網絡連接,都要有效地處理它們的操作。有許多解決方 案,但是 libevent 庫和 libev 庫能夠大大提高性能和事件處理能力。在本文中,我們要討論在 UNIX? 應用程序中使用和部署這些解決方案所用的基本結構和方法。libev 和 libevent 都可以在高性能應用程序中使用,包括部署在 IBM Cloud 或 Amazon EC2 環境中的應用程序,這些應用程序需要支持大量并發客戶端或操作。
簡介
許多服務器部署(尤其是 web 服務器部署)面對的最大問題之一是必須能夠處理大量連接。無論是通過構建基于云的服務來處理網絡通信流,還是把應用程序分布在 IBM Amazon EC 實例上,還是為網站提供高性能組件,都需要能夠處理大量并發連接。
一個好例子是,web 應用程序最近越來越動態了,尤其是使用 AJAX 技術的應用程序。如果要部署的系統允許數千客戶端直接在網頁中更新信息,比如提供事件或問題實時監視的系統,那么提供信息的速度就非常重要了。在網格或云 環境中,可能有來自數千客戶端的持久連接同時打開著,必須能夠處理每個客戶端的請求并做出響應。
在討論 libevent 和 libev 如何處理多個網絡連接之前,我們先簡要回顧一下處理這類連接的傳統解決方案。
處理多個客戶端
處理多個連接有許多不同的傳統方法,但是在處理大量連接時它們往往會產生問題,因為它們使用的內存或 CPU 太多,或者達到了某個操作系統限制。
使用的主要方法如下:
- 循環:早期系統使用簡單的循環選擇解決方案,即循環遍歷打開的網絡連接的列表,判斷是否有要讀取的數 據。這種方法既緩慢(尤其是隨著連接數量增加越來越慢),又低效(因為在處理當前連接時其他連接可能正在發送請求并等待響應)。在系統循環遍歷每個連接 時,其他連接不得不等待。如果有 100 個連接,其中只有一個有數據,那么仍然必須處理其他 99 個連接,才能輪到真正需要處理的連接。
- poll、epoll 和變體:這是對循環方法的改進,它用一個結構保存要監視的每個連接的數組,當在網絡套接字上發現數據時,通過回調機制調用處理函數。poll 的問題是這個結構會非常大,在列表中添加新的網絡連接時,修改結構會增加負載并影響性能。
- 選擇:select() 函數調用使用一個靜態結構,它事先被硬編碼為相當小的數量(1024 個連接),因此不適用于非常大的部署。
在各種平臺上還有其他實現(比如 Solaris 上的 /dev/poll 或 FreeBSD/NetBSD 上的 kqueue),它們在各自的 OS 上性能可能更好,但是無法移植,也不一定能夠解決處理請求的高層問題。
上面的所有解決方案都用簡單的循環等待并處理請求,然后把請求分派給另一個函數以處理實際的網絡交互。關鍵在于循環和網絡套接字需要大量管理代碼,這樣才能監聽、更新和控制不同的連接和接口。
處理許多連接的另一種方法是,利用現代內核中的多線程支持監聽和處理連接,為每個連接啟動一個新線程。這把責任直接交給操作系統,但是會在 RAM 和 CPU 方面增加相當大的開銷,因為每個線程都需要自己的執行空間。另外,如果每個線程都忙于處理網絡連接,線程之間的上下文切換會很頻繁。最后,許多內核并不適 于處理如此大量的活躍線程。
libevent 方法
libevent 庫實際上沒有更換 select()、poll() 或其他機制的基礎。而是使用對于每個平臺最高效的高性能解決方案在實現外加上一個包裝器。
為了實際處理每個請求,libevent 庫提供一種事件機制,它作為底層網絡后端的包裝器。事件系統讓為連接添加處理函數變得非常簡便,同時降低了底層 I/O 復雜性。這是 libevent 系統的核心。
libevent 庫的其他組件提供其他功能,包括緩沖的事件系統(用于緩沖發送到客戶端/從客戶端接收的數據)以及 HTTP、DNS 和 RPC 系統的核心實現。
創建 libevent 服務器的基本方法是,注冊當發生某一操作(比如接受來自客戶端的連接)時應該執行的函數,然后調用主事件循環 event_dispatch()。執行過程的控制現在由 libevent 系統處理。注冊事件和將調用的函數之后,事件系統開始自治;在應用程序運行時,可以在事件隊列中添加(注冊)或刪除(取消注冊)事件。事件注冊非常方便,可以通過它添加新事件以處理新打開的連接,從而構建靈活的網絡處理系統。
例如,可以打開一個監聽套接字,然后注冊一個回調函數,每當需要調用 accept() 函數以打開新連接時調用這個回調函數,這樣就創建了一個網絡服務器。清單 1 所示的代碼片段說明基本過程:
清單 1. 打開監聽套接字,注冊一個回調函數(每當需要調用 accept() 函數以打開新連接時調用它),由此創建網絡服務器
| int main(int argc, char **argv) { ...ev_init();/* Setup listening socket */event_set(&ev_accept, listen_fd, EV_READ|EV_PERSIST, on_accept, NULL);event_add(&ev_accept, NULL);/* Start the event loop. */event_dispatch(); } |
?
event_set() 函數創建新的事件結構,event_add() 在事件隊列機制中添加事件。然后,event_dispatch() 啟動事件隊列系統,開始監聽(并接受)請求。
清單 2 給出一個更完整的示例,它構建一個非常簡單的回顯服務器:
清單 2. 構建簡單的回顯服務器
| #include <event.h> #include <sys/types.h> #include <sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h> #include <string.h> #include <stdlib.h> #include <stdio.h> #include <fcntl.h> #include <unistd.h>#define SERVER_PORT 8080 int debug = 0;struct client {int fd;struct bufferevent *buf_ev; };int setnonblock(int fd) {int flags;flags = fcntl(fd, F_GETFL);flags |= O_NONBLOCK;fcntl(fd, F_SETFL, flags); }void buf_read_callback(struct bufferevent *incoming,void *arg) {struct evbuffer *evreturn;char *req;req = evbuffer_readline(incoming->input);if (req == NULL)return;evreturn = evbuffer_new();evbuffer_add_printf(evreturn,"You said %s\n",req);bufferevent_write_buffer(incoming,evreturn);evbuffer_free(evreturn);free(req); }void buf_write_callback(struct bufferevent *bev,void *arg) { }void buf_error_callback(struct bufferevent *bev,short what,void *arg) {struct client *client = (struct client *)arg;bufferevent_free(client->buf_ev);close(client->fd);free(client); }void accept_callback(int fd,short ev,void *arg) {int client_fd;struct sockaddr_in client_addr;socklen_t client_len = sizeof(client_addr);struct client *client;client_fd = accept(fd,(struct sockaddr *)&client_addr,&client_len);if (client_fd < 0){warn("Client: accept() failed");return;}setnonblock(client_fd);client = calloc(1, sizeof(*client));if (client == NULL)err(1, "malloc failed");client->fd = client_fd;client->buf_ev = bufferevent_new(client_fd,buf_read_callback,buf_write_callback,buf_error_callback,client);bufferevent_enable(client->buf_ev, EV_READ); }int main(int argc,char **argv) {int socketlisten;struct sockaddr_in addresslisten;struct event accept_event;int reuse = 1;event_init();socketlisten = socket(AF_INET, SOCK_STREAM, 0);if (socketlisten < 0){fprintf(stderr,"Failed to create listen socket");return 1;}memset(&addresslisten, 0, sizeof(addresslisten));addresslisten.sin_family = AF_INET;addresslisten.sin_addr.s_addr = INADDR_ANY;addresslisten.sin_port = htons(SERVER_PORT);if (bind(socketlisten,(struct sockaddr *)&addresslisten,sizeof(addresslisten)) < 0){fprintf(stderr,"Failed to bind");return 1;}if (listen(socketlisten, 5) < 0){fprintf(stderr,"Failed to listen to socket");return 1;}setsockopt(socketlisten,SOL_SOCKET,SO_REUSEADDR,&reuse,sizeof(reuse));setnonblock(socketlisten);event_set(&accept_event,socketlisten,EV_READ|EV_PERSIST,accept_callback,NULL);event_add(&accept_event,NULL);event_dispatch();close(socketlisten);return 0; } |
?
下面討論各個函數及其操作:
- main():主函數創建用來監聽連接的套接字,然后創建 accept() 的回調函數以便通過事件處理函數處理每個連接。
- accept_callback():當接受連接時,事件系統調用此函數。此函數接受到客戶端的連接;添加客 戶端套接字信息和一個 bufferevent 結構;在事件結構中為客戶端套接字上的讀/寫/錯誤事件添加回調函數;作為參數傳遞客戶端結構(和嵌入的 eventbuffer 和客戶端套接字)。每當對應的客戶端套接字包含讀、寫或錯誤操作時,調用對應的回調函數。
- buf_read_callback():當客戶端套接字有要讀的數據時調用它。作為回顯服務,此函數把 "you said..." 寫回客戶端。套接字仍然打開,可以接受新請求。
- buf_write_callback():當有要寫的數據時調用它。在這個簡單的服務中,不需要此函數,所以定義是空的。
- buf_error_callback():當出現錯誤時調用它。這包括客戶端中斷連接。在出現錯誤的所有場景中,關閉客戶端套接字,從事件列表中刪除客戶端套接字的事件條目,釋放客戶端結構的內存。
- setnonblock():設置網絡套接字以開放 I/O。
當客戶端連接時,在事件隊列中添加新事件以處理客戶端連接;當客戶端中斷連接時刪除事件。在幕后,libevent 處理網絡套接字,識別需要服務的客戶端,分別調用對應的函數。
為了構建這個應用程序,需要編譯 C 源代碼并添加 libevent 庫:$ gcc -o basic basic.c -levent。
從客戶端的角度來看,這個服務器僅僅把發送給它的任何文本發送回來(見 清單 3)。
清單 3. 服務器把發送給它的文本發送回來
| $ telnet localhost 8080 Trying 127.0.0.1... Connected to localhost. Escape character is '^]'. Hello! You said Hello! |
這樣的網絡應用程序非常適合需要處理多個連接的大規模分布式部署,比如 IBM Cloud 系統。
很難通過簡單的解決方案觀察處理大量并發連接的情況和性能改進。可以使用嵌入的 HTTP 實現幫助了解可伸縮性。
使用內置的 HTTP 服務器
如果希望構建本機應用程序,可以使用一般的基于網絡的 libevent 接口;但是,越來越常見的場景是開發基于 HTTP 協議的應用程序,以及裝載或動態地重新裝載信息的網頁。如果使用任何 AJAX 庫,客戶端就需要 HTTP,即使您返回的信息是 XML 或 JSON。
libevent 中的 HTTP 實現并不是 Apache HTTP 服務器的替代品,而是適用于與云和 web 環境相關聯的大規模動態內容的實用解決方案。例如,可以在 IBM Cloud 或其他解決方案中部署基于 libevent 的接口。因為可以使用 HTTP 進行通信,服務器可以與其他組件集成。
要想使用 libevent 服務,需要使用與主要網絡事件模型相同的基本結構,但是還必須處理網絡接口,HTTP 包裝器會替您處理。這使整個過程變成四個函數調用(初始化、啟動 HTTP 服務器、設置 HTTP 回調函數和進入事件循環),再加上發送回數據的回調函數。清單 4 給出一個非常簡單的示例:
清單 4. 使用 libevent 服務的簡單示例
| #include <sys/types.h>#include <stdio.h> #include <stdlib.h> #include <unistd.h>#include <event.h> #include <evhttp.h>void generic_request_handler(struct evhttp_request *req, void *arg) {struct evbuffer *returnbuffer = evbuffer_new();evbuffer_add_printf(returnbuffer, "Thanks for the request!");evhttp_send_reply(req, HTTP_OK, "Client", returnbuffer);evbuffer_free(returnbuffer);return; }int main(int argc, char **argv) {short http_port = 8081;char *http_addr = "192.168.0.22";struct evhttp *http_server = NULL;event_init();http_server = evhttp_start(http_addr, http_port);evhttp_set_gencb(http_server, generic_request_handler, NULL);fprintf(stderr, "Server started on port %d\n", http_port);event_dispatch();return(0); } |
?
應該可以通過前面的示例看出代碼的基本結構,不需要解釋。主要元素是 evhttp_set_gencb() 函數(它設置當收到 HTTP 請求時要使用的回調函數)和generic_request_handler() 回調函數本身(它用一個表示成功的簡單消息填充響應緩沖區)。
HTTP 包裝器提供許多其他功能。例如,有一個請求解析器,它會從典型的請求中提取出查詢參數(就像處理 CGI 請求一樣)。還可以設置在不同的請求路徑中要觸發的處理函數。通過設置不同的回調函數和處理函數,可以使用路徑 '/db/' 提供到數據庫的接口,或使用 '/memc' 提供到 memcached 的接口。
libevent 工具包的另一個特性是支持通用計時器。可以在指定的時間段之后觸發事件。可以通過結合使用計時器和 HTTP 實現提供輕量的服務,從而自動地提供文件內容,在修改文件內容時更新返回的數據。例如,以前要想在新聞頻發的活動期間提供即時更新服務,前端 web 應用程序就需要定期重新裝載新聞稿,而現在可以輕松地提供內容。整個應用程序(和 web 服務)都在內存中,因此響應非常快。
這就是 清單 5 中的示例的主要用途:
清單 5. 使用計時器在新聞頻發的活動期間提供即時更新服務
| #include <sys/types.h> #include <stdio.h> #include <stdlib.h> #include <string.h> #include <unistd.h> #include <sys/stat.h> #include <event.h> #include <evhttp.h>#define RELOAD_TIMEOUT 5 #define DEFAULT_FILE "sample.html"char *filedata; time_t lasttime = 0; char filename[80]; int counter = 0;void read_file() {int size = 0;char *data;struct stat buf;stat(filename,&buf);if (buf.st_mtime > lasttime){if (counter++)fprintf(stderr,"Reloading file: %s",filename);elsefprintf(stderr,"Loading file: %s",filename);FILE *f = fopen(filename, "rb");if (f == NULL){fprintf(stderr,"Couldn't open file\n");exit(1);}fseek(f, 0, SEEK_END);size = ftell(f);fseek(f, 0, SEEK_SET);data = (char *)malloc(size+1);fread(data, sizeof(char), size, f);filedata = (char *)malloc(size+1);strcpy(filedata,data);fclose(f);fprintf(stderr," (%d bytes)\n",size);lasttime = buf.st_mtime;} }void load_file() {struct event *loadfile_event;struct timeval tv;read_file();tv.tv_sec = RELOAD_TIMEOUT;tv.tv_usec = 0;loadfile_event = malloc(sizeof(struct event));evtimer_set(loadfile_event,load_file,loadfile_event);evtimer_add(loadfile_event,&tv); }void generic_request_handler(struct evhttp_request *req, void *arg) {struct evbuffer *evb = evbuffer_new();evbuffer_add_printf(evb, "%s",filedata);evhttp_send_reply(req, HTTP_OK, "Client", evb);evbuffer_free(evb); }int main(int argc, char *argv[]) {short http_port = 8081;char *http_addr = "192.168.0.22";struct evhttp *http_server = NULL;if (argc > 1){strcpy(filename,argv[1]);printf("Using %s\n",filename);}else{strcpy(filename,DEFAULT_FILE);}event_init();load_file();http_server = evhttp_start(http_addr, http_port);evhttp_set_gencb(http_server, generic_request_handler, NULL);fprintf(stderr, "Server started on port %d\n", http_port);event_dispatch(); } |
?
這個服務器的基本原理與前面的示例相同。首先,腳本設置一個 HTTP 服務器,它只響應對基本 URL 主機/端口組合的請求(不處理請求 URI)。第一步是裝載文件 (read_file())。在裝載最初的文件時和在計時器觸發回調時都使用此函數。
read_file() 函數使用 stat() 函數調用檢查文件的修改時間,只有在上一次裝載之后修改了文件的情況下,它才重新讀取文件的內容。此函數通過調用fread() 裝載文件數據,把數據復制到另一個結構中,然后使用strcpy() 把數據從裝載的字符串轉移到全局字符串中。
load_file() 函數是觸發計時器時調用的函數。它通過調用 read_file() 裝載內容,然后使用 RELOAD_TIMEOUT 值設置計時器,作為嘗試裝載文件之前的秒數。libevent 計時器使用 timeval 結構,允許按秒和毫秒指定計時器。計時器不是周期性的;當觸發計時器事件時設置它,然后從事件隊列中刪除事件。
使用與前面的示例相同的格式編譯代碼:$ gcc -o basichttpfile basichttpfile.c -levent。
現在,創建作為數據使用的靜態文件;默認文件是 sample.html,但是可以通過命令行上的第一個參數指定任何文件(見 清單 6)。
清單 6. 創建作為數據使用的靜態文件
| $ ./basichttpfile Loading file: sample.html (8046 bytes) Server started on port 8081 |
?
現在,程序可以接受請求了,重新裝載計時器也啟動了。如果修改 sample.html 的內容,應該會重新裝載此文件并在日志中記錄一個消息。例如,清單 7 中的輸出顯示初始裝載和兩次重新裝載:
清單 7. 輸出顯示初始裝載和兩次重新裝載
| $ ./basichttpfile Loading file: sample.html (8046 bytes) Server started on port 8081 Reloading file: sample.html (8047 bytes) Reloading file: sample.html (8048 bytes) |
?
注意,要想獲得最大的收益,必須確保環境沒有限制打開的文件描述符數量。可以使用 ulimit 命令修改限制(需要適當的權限或根訪問)。具體的設置取決與您的 OS,但是在 Linux? 上可以用-n 選項設置打開的文件描述符(和網絡套接字)的數量:
清單 8. 用 -n 選項設置打開的文件描述符數量
| $ ulimit -n 1024 |
?
通過指定數字提高限制:$ ulimit -n 20000。
可以使用 Apache Bench 2 (ab2) 等性能基準測試應用程序檢查服務器的性能。可以指定并發查詢的數量以及請求的總數。例如,使用 100,000 個請求運行基準測試,并發請求數量為 1000 個:$ ab2 -n 100000 -c 1000 http://192.168.0.22:8081/。
使用服務器示例中所示的 8K 文件運行這個示例系統,獲得的結果為大約每秒處理 11,000 個請求。請記住,這個 libevent 服務器在單一線程中運行,而且單一客戶端不太可能給服務器造成壓力,因為它還受到打開請求的方法的限制。盡管如此,在交換的文檔大小適中的情況下,這樣的 處理速率對于單線程應用程序來說仍然令人吃驚。
使用其他語言的實現
盡管 C 語言很適合許多系統應用程序,但是在現代環境中不經常使用 C 語言,腳本語言更靈活、更實用。幸運的是,Perl 和 PHP 等大多數腳本語言是用 C 編寫的,所以可以通過擴展模塊使用 libevent 等 C 庫。
例如,清單 9 給出 Perl 網絡服務器腳本的基本結構。accept_callback() 函數與清單 1 所示核心 libevent 示例中的 accept 函數相同。
清單 9. Perl 網絡服務器腳本的基本結構
| my $server = IO::Socket::INET->new(LocalAddr => 'localhost',LocalPort => 8081,Proto => 'tcp',ReuseAddr => SO_REUSEADDR,Listen => 1,Blocking => 0,) or die $@;my $accept = event_new($server, EV_READ|EV_PERSIST, \&accept_callback);$main->add;event_mainloop(); |
?
用這些語言編寫的 libevent 實現通常支持 libevent 系統的核心,但是不一定支持 HTTP 包裝器。因此,對腳本編程的應用程序使用這些解決方案會比較復雜。有兩種方法:要么把腳本語言嵌入到基于 C 的 libevent 應用程序中,要么使用基于腳本語言環境構建的眾多 HTTP 實現之一。例如,Python 包含功能很強的 HTTP 服務器類 (httplib/httplib2)。
應該指出一點:在腳本語言中沒有什么東西是無法用 C 重新實現的。但是,要考慮到開發時間的限制,而且與現有代碼集成可能更重要。
libev 庫
與 libevent 一樣,libev 系統也是基于事件循環的系統,它在 poll()、select() 等機制的本機實現的基礎上提供基于事件的循環。到我撰寫本文時,libev 實現的開銷更低,能夠實現更好的基準測試結果。libev API 比較原始,沒有 HTTP 包裝器,但是 libev 支持在實現中內置更多事件類型。例如,一種 evstat 實現可以監視多個文件的屬性變動,可以在清單 4 所示的 HTTP 文件解決方案中使用它。
但是,libevent 和 libev 的基本過程是相同的。創建所需的網絡監聽套接字,注冊在執行期間要調用的事件,然后啟動主事件循環,讓 libev 處理過程的其余部分。
例如,可以使用 Ruby 接口按照與清單 1 相似的方式提供回顯服務器,見 清單 10。
清單 10. 使用 Ruby 接口提供回顯服務器
| require 'rubygems' require 'rev'PORT = 8081class EchoServerConnection < Rev::TCPSocketdef on_read(data)write 'You said: ' + dataend endserver = Rev::TCPServer.new('192.168.0.22', PORT, EchoServerConnection) server.attach(Rev::Loop.default)puts "Listening on localhost:#{PORT}" Rev::Loop.default.run |
?
Ruby 實現尤其出色,因為它為許多常用的網絡解決方案提供了包裝器,包括 HTTP 客戶端、OpenSSL 和 DNS。其他腳本語言實現包括功能全面的 Perl 和 Python 實現,您可以試一試。
結束語
libevent 和 libev 都提供靈活且強大的環境,支持為處理服務器端或客戶端請求實現高性能網絡(和其他 I/O)接口。目標是以高效(CPU/RAM 使用量低)的方式支持數千甚至數萬個連接。在本文中,您看到了一些示例,包括 libevent 中內置的 HTTP 服務,可以使用這些技術支持基于 IBM Cloud、EC2 或 AJAX 的 web 應用程序
總結
以上是生活随笔為你收集整理的高效并发处理之libevent的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Lighttpd源码分析之状态机与插件
- 下一篇: 处理大并发之四 libevent dem