epoll 系列系统调用
epoll是linux 特有的I/O復用函數,它在實現和使用上和select ,poll有很大的差異!首先epoll使用一組函數來完成任務,而不是單個函數。其實epoll吧用戶關心的文件描述符的事件放在內核的一個事件表里,而無須像select? 和poll那樣每次調用都要重復傳入文件描述符或事件集。但epoll需要使用一個額外的文件描述符,來唯一標識內核中的一個事件表,這文件描述符使用如下的epoll_create函數來創建:
下面的函數用來操作epoll的內核事件表:
其中event成員描述事件類型!epoll支持的事件類型和poll基本相同,但epoll有兩個額外的事件類型-EPOLLET和EPOLLONESHOT。它們對于epoll的高效運作非常關鍵。data成員用于存儲用戶數據,其類型epoll_data_t的定義如下:
epoll系列系統調用的主要接口是epoll_wait函數。它在一段時間內等待一組文件描述符上的事件 ,其原型如下:
該函數成功時返回就緒的文件描述符的個數,失敗時返回-1并設置errno。maxevents參數指定最多監聽多少個事件,它必須大于0。
epoll_wait函數如果檢測到事件,就將所有就緒的事件從內核事件表中復制到它的第二個參數events指向的數組中,這數組只用于輸出epoll_wait檢測到的就緒事件。
epoll對文件描述符的操作有兩種模式:LT和ET模式。LT模式是默認的工作模式,這種模式下epoll相當于一個效率較高的poll。當往epoll內核事件表中注冊一個文件描述符上的EPOLLET事件時,epoll將以ET模式來操作該文件描述符。ET模式是epoll的搞笑模式!
對于采用LT工作模式的文件描述符,當epoll_wait檢測到其上事件發送并將此事件通知應用程序后,應用程序可以不立即處理該事件。這樣當應用程序下次調用epoll_wait時,epoll_wait還會再次向應用程序通知此事件,直到該事件被處理。而對于ET模式的文件描述符,當epoll_wait檢測到其上有事件發生并將此事件通知應用程序后,應用程序應該立即處理該事件。因為后續的epoll_wait調用將不再向應用程序通知這一事件。可見,ET模式在很大程序上降低了同一個epoll事件被重復觸發的次數,因此效率要比LT模式要高。
#include <sys/types.h> #include <sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h> #include <assert.h> #include <stdio.h> #include <unistd.h> #include <errno.h> #include <string.h> #include <fcntl.h> #include <stdlib.h> #include <sys/epoll.h> #include <pthread.h>#define MAX_EVENT_NUMBER 1024 #define BUFFER_SIZE 10int setnonblocking( int fd ) {int old_option = fcntl( fd, F_GETFL );int new_option = old_option | O_NONBLOCK;fcntl( fd, F_SETFL, new_option );return old_option; }void addfd( int epollfd, int fd, bool enable_et ) {epoll_event event;event.data.fd = fd;event.events = EPOLLIN;if( enable_et ){event.events |= EPOLLET;}epoll_ctl( epollfd, EPOLL_CTL_ADD, fd, &event );setnonblocking( fd ); }void lt( epoll_event* events, int number, int epollfd, int listenfd ) {char buf[ BUFFER_SIZE ];for ( int i = 0; i < number; i++ ){int sockfd = events[i].data.fd;if ( sockfd == listenfd ){struct sockaddr_in client_address;socklen_t client_addrlength = sizeof( client_address );int connfd = accept( listenfd, ( struct sockaddr* )&client_address, &client_addrlength );addfd( epollfd, connfd, false );}else if ( events[i].events & EPOLLIN ){printf( "event trigger once\n" );memset( buf, '\0', BUFFER_SIZE );int ret = recv( sockfd, buf, BUFFER_SIZE-1, 0 );if( ret <= 0 ){close( sockfd );continue;}printf( "get %d bytes of content: %s\n", ret, buf );}else{printf( "something else happened \n" );}} }void et( epoll_event* events, int number, int epollfd, int listenfd ) {char buf[ BUFFER_SIZE ];for ( int i = 0; i < number; i++ ){int sockfd = events[i].data.fd;if ( sockfd == listenfd ){struct sockaddr_in client_address;socklen_t client_addrlength = sizeof( client_address );int connfd = accept( listenfd, ( struct sockaddr* )&client_address, &client_addrlength );addfd( epollfd, connfd, true );}else if ( events[i].events & EPOLLIN ){printf( "event trigger once\n" );while( 1 ){memset( buf, '\0', BUFFER_SIZE );int ret = recv( sockfd, buf, BUFFER_SIZE-1, 0 );if( ret < 0 ){if( ( errno == EAGAIN ) || ( errno == EWOULDBLOCK ) ){printf( "read later\n" );break;}close( sockfd );break;}else if( ret == 0 ){close( sockfd );}else{printf( "get %d bytes of content: %s\n", ret, buf );}}}else{printf( "something else happened \n" );}} }int main( int argc, char* argv[] ) {if( argc <= 2 ){printf( "usage: %s ip_address port_number\n", basename( argv[0] ) );return 1;}const char* ip = argv[1];int port = atoi( argv[2] );int ret = 0;struct sockaddr_in address;bzero( &address, sizeof( address ) );address.sin_family = AF_INET;inet_pton( AF_INET, ip, &address.sin_addr );address.sin_port = htons( port );int listenfd = socket( PF_INET, SOCK_STREAM, 0 );assert( listenfd >= 0 );ret = bind( listenfd, ( struct sockaddr* )&address, sizeof( address ) );assert( ret != -1 );ret = listen( listenfd, 5 );assert( ret != -1 );epoll_event events[ MAX_EVENT_NUMBER ];int epollfd = epoll_create( 5 );assert( epollfd != -1 );addfd( epollfd, listenfd, true );while( 1 ){int ret = epoll_wait( epollfd, events, MAX_EVENT_NUMBER, -1 );if ( ret < 0 ){printf( "epoll failure\n" );break;}lt( events, ret, epollfd, listenfd );//et( events, ret, epollfd, listenfd );}close( listenfd );return 0; }EPOLLONESHOT事件
即使我們使用ET模式,一個socket上的某個事件還是可能被觸發多次。這在并發程序中就會引起一個問題,比如一個線程在讀取完某個socket上的數據后開始處理這些數據,而在數據的處理過程中該socket上又有新數據可讀(EPOLLIN再次被觸發),此時另外個線程就被喚醒來讀取這些新的數據。于是就出現了兩個線程同時操作一個socket的局面。這當然不是我們所期望的,我們期望的是一個socket連接在任一時刻都只被一個線程處理!這一點我們可以使用epoll的 EPOLLONESHOT事件來實現!
對于注冊了EPOLLONESHOT事件的文件描述符,操作系統最多觸發其上注冊的一個可讀 可寫或異常事件,且只觸發一次!除非我們使用epoll_ctl函數重置該文件描述符上注冊的EPOLLONESHOT事件,這樣,當一個線程在處理某個socket時,其他線程是不可能有機會操作該socket的。但反過來思考該線程應該立即重置這socket上的EPOLLONESHOT事件,以確保這socket下一次可讀時,其EPOLLIN事件被觸發,進而讓其他工作線程有機會繼續處理這socket。
#include <sys/types.h> #include <sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h> #include <assert.h> #include <stdio.h> #include <unistd.h> #include <errno.h> #include <string.h> #include <fcntl.h> #include <stdlib.h> #include <sys/epoll.h> #include <pthread.h>#define MAX_EVENT_NUMBER 1024 #define BUFFER_SIZE 1024 struct fds {int epollfd;int sockfd; };int setnonblocking( int fd ) {int old_option = fcntl( fd, F_GETFL );int new_option = old_option | O_NONBLOCK;fcntl( fd, F_SETFL, new_option );return old_option; }void addfd( int epollfd, int fd, bool oneshot ) {epoll_event event;event.data.fd = fd;event.events = EPOLLIN | EPOLLET;if( oneshot ){event.events |= EPOLLONESHOT;}epoll_ctl( epollfd, EPOLL_CTL_ADD, fd, &event );setnonblocking( fd ); }void reset_oneshot( int epollfd, int fd ) {epoll_event event;event.data.fd = fd;event.events = EPOLLIN | EPOLLET | EPOLLONESHOT;epoll_ctl( epollfd, EPOLL_CTL_MOD, fd, &event ); }void* worker( void* arg ) {int sockfd = ( (fds*)arg )->sockfd;int epollfd = ( (fds*)arg )->epollfd;printf( "start new thread to receive data on fd: %d\n", sockfd );char buf[ BUFFER_SIZE ];memset( buf, '\0', BUFFER_SIZE );while( 1 ){int ret = recv( sockfd, buf, BUFFER_SIZE-1, 0 );if( ret == 0 ){close( sockfd );printf( "foreiner closed the connection\n" );break;}else if( ret < 0 ){if( errno == EAGAIN ){reset_oneshot( epollfd, sockfd );printf( "read later\n" );break;}}else{printf( "get content: %s\n", buf );sleep( 5 );}}printf( "end thread receiving data on fd: %d\n", sockfd ); }int main( int argc, char* argv[] ) {if( argc <= 2 ){printf( "usage: %s ip_address port_number\n", basename( argv[0] ) );return 1;}const char* ip = argv[1];int port = atoi( argv[2] );int ret = 0;struct sockaddr_in address;bzero( &address, sizeof( address ) );address.sin_family = AF_INET;inet_pton( AF_INET, ip, &address.sin_addr );address.sin_port = htons( port );int listenfd = socket( PF_INET, SOCK_STREAM, 0 );assert( listenfd >= 0 );ret = bind( listenfd, ( struct sockaddr* )&address, sizeof( address ) );assert( ret != -1 );ret = listen( listenfd, 5 );assert( ret != -1 );epoll_event events[ MAX_EVENT_NUMBER ];int epollfd = epoll_create( 5 );assert( epollfd != -1 );addfd( epollfd, listenfd, false );while( 1 ){int ret = epoll_wait( epollfd, events, MAX_EVENT_NUMBER, -1 );if ( ret < 0 ){printf( "epoll failure\n" );break;}for ( int i = 0; i < ret; i++ ){int sockfd = events[i].data.fd;if ( sockfd == listenfd ){struct sockaddr_in client_address;socklen_t client_addrlength = sizeof( client_address );int connfd = accept( listenfd, ( struct sockaddr* )&client_address, &client_addrlength );addfd( epollfd, connfd, true );}else if ( events[i].events & EPOLLIN ){pthread_t thread;fds fds_for_new_worker;fds_for_new_worker.epollfd = epollfd;fds_for_new_worker.sockfd = sockfd;pthread_create( &thread, NULL, worker, ( void* )&fds_for_new_worker );}else{printf( "something else happened \n" );}}}close( listenfd );return 0; }總結
以上是生活随笔為你收集整理的epoll 系列系统调用的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 正确使用Core Data多线程的3种方
- 下一篇: (王道408考研操作系统)第二章进程管理