高效的半同步/半异步模式的实现
先介紹一下半同步/半異步模式:
首先半同步/半異步模式中的同步和異步和前面的IO模型中的同步和異步是完全不用的概念。在IO模型中,同步和異步區分的是內核向應用程序通知的是何種IO事件(是就緒事件還是完成事件),以及該由誰來完成IO讀寫(是應用程序還是內核)。在并發模式中,同步指的是程序完全按照代碼序列的順序執行,異步指的是程序的執行需要由系統事件來驅動。常見的系統事件包括中斷 信號等。
比如8-8a描述了同步的讀操作 ,圖8-8b則描述了異步的讀操作。
按照同步方式運行的線程稱為同步線程,按照異步方式運行的線程成為異步線程。顯然異步線程的執行效率高,實時性強,這是很多嵌入式程序采用的模型。但編寫異步方式執行的程序相對復雜,難于調試和擴展,且不適合大量的并發。而同步線程則相反,它雖然效率比較低,實時性較差,但邏輯簡單。因此,對于像服務器這種既要求較好的實時性,又要求同時處理多個客戶請求的應用程序,我們就應該同時使用同步線程和異步線程來實現。即使用半同步/半異步模式來實現!
半同步/半異步模式中,同步線程用于處理客戶邏輯,異步線程用于處理IO事件。異步線程監聽到客戶請求后,就將其封裝成請求對象并插入請求隊列中。請求隊列將通知某個工作在同步模式的工作線程來讀取并處理該對象。具體選擇哪個工作線程來為新的客戶請求服務,則取決于請求隊列的設計。
下面圖8-9總結了半同步/半異步的工作流程
在服務器程序中,如果結合考慮兩種事件處理模式和幾種IO模型,則半同步/半異步模式就存在多種變體。其中有一種變體成為半同步/半反應對模式,如下圖8-10所示
圖8-10中,主線程插入請求隊列中的任務就是就緒的連接socket.這說明該圖所示的半同步/半反應堆模式采用的事件處理模式是Reactor模式,它要求工作線程自己從socket上讀取客戶請求和往socket寫入服務器應答。實際上,半同步/半反應對模式也可以使用模擬的Proactor事件處理模式,即由主線程來完成數據的讀寫。在這種情況下,主線程一般會將應用程序數據 任務類型等信息封裝為一個任務對象,如何將其插入請求隊列。工作線程從請求隊列中取得任務對象之后,即可直接處理,而無須執行讀寫操作了。
可見 圖8-11中,每個線程都維持著自己的事件循環,它們各自獨立監聽不同的事件,因此在這種搞笑的半同步/半異步模式中,每個線程都工作在異步模式。
下面吧大概的代碼展示一下
#include "SocketServer.h"SocketServer::SocketServer(void) :m_nport(5001),m_epollfd(-1),m_bindsocket(-1),m_bstop(false) { }SocketServer::~SocketServer(void) {}void SocketServer::S_WorkService(void* arg) {int epollfd = *(int*)arg;SOCKETServer::Instance()->WorkService(epollfd); }void SocketServer::WorkService(int epollfd) {epoll_event events[ MAX_EVENT_NUMBER ]; while( !m_bstop ){int event_num = epoll_wait( epollfd, events, MAX_EVENT_NUMBER, -1 );if ( event_num < 0 && (errno != EINTR)){DC_ERROR("epoll_wait error ,errmsg = %s",strerror(errno));break;}for ( int i = 0; i < event_num; i++ ){int sock = events[i].data.fd;struct sockaddr_in client_address;socklen_t client_addrlength = sizeof( client_address );getpeername(sock, (struct sockaddr *)&client_address, &client_addrlength); char remoteAddress[INET_ADDRSTRLEN ] = {0};inet_ntop( AF_INET, &client_address.sin_addr, remoteAddress, INET_ADDRSTRLEN );int remotePort = ntohs( client_address.sin_port );if ( events[i].events & EPOLLIN ){char* pRecvBuff = new char[SOCKET_BUF_SIZE];int nRemainDataSize = 0;while( true ){int nBytesThisTime = recv( sock, pRecvBuff + nRemainDataSize, SOCKET_BUF_SIZE -1-nRemainDataSize, 0 );if( nBytesThisTime < 0 ){if( ( errno == EAGAIN ) || ( errno == EWOULDBLOCK ) ){break;}DC_ERROR("socket errormsg = %s , %s:%d close",strerror(errno),remoteAddress ,remotePort);del_socket_epoll(epollfd,sock);break;}else if (nBytesThisTime == 0){DC_ERROR("socket errormsg = %s , %s:%d close",strerror(errno),remoteAddress ,remotePort);nRemainDataSize = 0;del_socket_epoll(epollfd,sock);break;}nRemainDataSize += nBytesThisTime;}//如果讀取失敗的話就直接返回if(nRemainDataSize == 0){continue;}DC_INFO("recv %s:%d data size = %d" ,remoteAddress ,remotePort,nRemainDataSize);delete(pRecvBuff);pRecvBuff = NULL;if(0 != reset_socket_epoll(epollfd, sock)){del_socket_epoll(epollfd,sock);}}else if( events[i].events & ( EPOLLRDHUP | EPOLLHUP | EPOLLERR ) ){DC_ERROR("socket errormsg = %s , %s:%d close",strerror(errno),remoteAddress ,remotePort);del_socket_epoll(epollfd,sock);}else{DC_ERROR("socket errormsg = %s , %s:%d close",strerror(errno),remoteAddress ,remotePort);}}}} int SocketServer::StartServer(int port ,int threadNum ) {m_nport = port;m_nThreadNum = threadNum;m_bindsocket = socket( PF_INET, SOCK_STREAM, 0 );if(m_bindsocket < 0){DC_ERROR("socket error ,errmsg = %s",strerror(errno));return SERVER_ERROR;}/*地址可復用 time_wait*/ if(0 != make_socket_reuseable(m_bindsocket)){return SERVER_ERROR;}/*設置超時時間*/if(0 != make_socket_timeout(m_bindsocket,SOCKET_TIMEOUT)){return SERVER_ERROR;}/*設置緩沖區大小*/if(0 != make_socket_buffsize(m_bindsocket,SOCKET_BUF_SIZE)){return SERVER_ERROR;}/*設置非阻塞*/if(0 != make_socket_nonblock(m_bindsocket)){return SERVER_ERROR;}/*綁定地址和端口*/struct sockaddr_in address;bzero( &address, sizeof( address ) );address.sin_family = AF_INET;inet_pton( AF_INET, "0.0.0.0", &address.sin_addr );address.sin_port = htons( m_nport );if(0 != bind( m_bindsocket, ( struct sockaddr* )&address, sizeof(address) )){DC_ERROR("bind %d error ,errmsg = %s",m_nport,strerror(errno));return SERVER_ERROR;}/*監聽端口*/if(0 != listen(m_bindsocket, 128)){DC_ERROR("bind %d error ,errmsg = %s",m_nport,strerror(errno));return SERVER_ERROR;}/*創建工作線程以及對應的epoll句柄*/for(int i = 0 ;i < m_nThreadNum ;i++){int epollfd = epoll_create( 5 );swartz_thread_detached_create((void*)S_WorkService, (void*)&epollfd, 0, 0); //休眠50ms ,保證工作線程里面的epollfd是正確的usleep(50*1000);m_EpollVec.push_back(epollfd);}/*epoll 監聽*/epoll_event events[ MAX_EVENT_NUMBER ];m_epollfd = epoll_create( 5 );if(m_epollfd == -1){DC_ERROR("epoll_create error ,errmsg = %s",strerror(errno));return SERVER_ERROR;}if(0 != add_socket_epoll(m_epollfd, m_bindsocket,false)){return SERVER_ERROR;}while( 1 ){static int ClusterNum = 0;int event_num = epoll_wait( m_epollfd, events, MAX_EVENT_NUMBER, -1 );if ( event_num < 0 && (errno != EINTR)){DC_ERROR("epoll_wait error ,errmsg = %s",strerror(errno));break;}for ( int i = 0; i < event_num; i++ ){int sockfd = events[i].data.fd;if ( sockfd == m_bindsocket ){struct sockaddr_in client_address;socklen_t client_addrlength = sizeof( client_address );int clientfd = accept( m_bindsocket, ( struct sockaddr* )&client_address, &client_addrlength );if(clientfd < 0){DC_ERROR("accept error ,errmsg = %s",strerror(errno));continue ;}char remoteAddress[INET_ADDRSTRLEN ] = {0};inet_ntop( AF_INET, &client_address.sin_addr, remoteAddress, INET_ADDRSTRLEN );int remotePort = ntohs( client_address.sin_port );DC_INFO("%s:%d connect" , remoteAddress, remotePort );/*設置非阻塞*/if(0 != make_socket_nonblock(clientfd)){continue;}if(0 != add_socket_epoll(m_EpollVec[ClusterNum%m_nThreadNum], clientfd,true)){continue;}ClusterNum++;}else{DC_INFO("other thing happened ,event = %d ",events[i].events);} }}m_bstop = true;StopServer();return SERVER_OK; }int SocketServer::make_socket_nonblock(int sock) {int flags;if ((flags = fcntl(sock, F_GETFL, NULL)) < 0) {DC_ERROR("fcntl(%d, F_GETFL) ,ermsg = %s", sock,strerror(errno));return SERVER_ERROR;}if (fcntl(sock, F_SETFL, flags | O_NONBLOCK) == -1){DC_ERROR("fcntl(%d, F_SETFL) O_NONBLOCK ,ermsg = %s", sock,strerror(errno));return SERVER_ERROR;}return SERVER_OK;}int SocketServer::make_socket_reuseable(int sock) {int reuse = 1;if(0 != setsockopt( sock, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof( reuse ))){DC_ERROR("setsockopt SO_REUSEADDR error ,errmsg = %s",strerror(errno));return SERVER_ERROR;}struct linger tcpLinger;tcpLinger.l_onoff = 1;tcpLinger.l_linger = 0;if (0 != setsockopt(sock, SOL_SOCKET, SO_LINGER, &tcpLinger, sizeof(tcpLinger))){DC_ERROR("setsockopt SO_LINGER error ,errmsg = %s",strerror(errno));return SERVER_ERROR;}return SERVER_OK; }int SocketServer::make_socket_timeout(int sock,int time) {/*查詢和設置發送超時時間*/struct timeval send_timeout; send_timeout.tv_sec = time; send_timeout.tv_usec = 0; int len = sizeof( timeval ); if(0 != setsockopt( sock, SOL_SOCKET, SO_SNDTIMEO, &send_timeout, sizeof(send_timeout) )){DC_ERROR("setsockopt SO_SNDTIMEO error ,errmsg = %s",strerror(errno));return SERVER_ERROR;}getsockopt( sock, SOL_SOCKET, SO_SNDTIMEO, &send_timeout, ( socklen_t* )&len);DC_INFO( "the send timeout after settting is %ds", send_timeout.tv_sec/1000 );/*查詢和設置接收超時時間*/struct timeval recv_timeout; recv_timeout.tv_sec = time; recv_timeout.tv_usec = 0; if(0 != setsockopt( sock, SOL_SOCKET, SO_RCVTIMEO, &recv_timeout, sizeof( recv_timeout) )){DC_ERROR("setsockopt SO_RCVTIMEO error ,errmsg = %s",strerror(errno));return SERVER_ERROR;}getsockopt( sock, SOL_SOCKET, SO_RCVTIMEO, &recv_timeout, ( socklen_t* )&len);DC_INFO( "the recv timeout after setting is %ds", recv_timeout.tv_sec/1000 );return SERVER_OK; } int SocketServer::make_socket_buffsize(int sock,int size) {/*查詢和設置接收緩沖區*/int recvbuf = 0;int len = sizeof( recvbuf );getsockopt( sock, SOL_SOCKET, SO_RCVBUF, &recvbuf, ( socklen_t* )&len);DC_INFO( "the receive buffer size before settting is %d", recvbuf );recvbuf = size;if(0 != setsockopt( sock, SOL_SOCKET, SO_RCVBUF, &recvbuf, sizeof( recvbuf) )){DC_ERROR("setsockopt SO_RCVBUF error ,errmsg = %s",strerror(errno));return SERVER_ERROR;}getsockopt( sock, SOL_SOCKET, SO_RCVBUF, &recvbuf, ( socklen_t* )&len);DC_INFO( "the receive buffer size after settting is %d", recvbuf );/*查詢和設置發送緩沖區*/int sendbuf = 0;getsockopt( sock, SOL_SOCKET, SO_SNDBUF, &sendbuf, ( socklen_t* )&len);DC_INFO( "the tcp send buffer size before setting is %d", sendbuf );sendbuf = size;if(0 != setsockopt( sock, SOL_SOCKET, SO_SNDBUF, &sendbuf, sizeof( sendbuf) )){DC_ERROR("setsockopt SO_SNDBUF error ,errmsg = %s",strerror(errno));return SERVER_ERROR;}getsockopt( sock, SOL_SOCKET, SO_SNDBUF, &sendbuf, ( socklen_t* )&len);DC_INFO( "the tcp send buffer size after setting is %d", sendbuf );return SERVER_OK; }int SocketServer::add_socket_epoll(int epollfd ,int socket,bool oneshot) {epoll_event event;event.data.fd = socket;event.events = EPOLLIN | EPOLLET ;if( oneshot ){event.events |= EPOLLONESHOT;}if(0 != epoll_ctl( epollfd, EPOLL_CTL_ADD, socket, &event )){DC_ERROR("epoll_ctl error ,errmsg = %s",strerror(errno));return SERVER_ERROR;}return SERVER_OK; }int SocketServer::reset_socket_epoll(int epollfd ,int socket) {epoll_event event;event.data.fd = socket;event.events = EPOLLIN | EPOLLET | EPOLLONESHOT;if(0 != epoll_ctl( epollfd, EPOLL_CTL_MOD, socket, &event )){DC_ERROR("epoll_ctl error ,errmsg = %s",strerror(errno));return SERVER_ERROR;}return SERVER_OK; }int SocketServer::del_socket_epoll(int epollfd ,int socket) {if(0 != epoll_ctl( epollfd, EPOLL_CTL_DEL, socket,0)){DC_ERROR("epoll_ctl error ,errmsg = %s",strerror(errno));return SERVER_ERROR;}close(socket);return SERVER_OK; }void SocketServer::StopServer() {/*關閉所有的文件描述符*/for (int sockfd = 3; sockfd < getdtablesize(); sockfd++){close(sockfd);} } #ifndef _SOCKER_SERVER_H #define _SOCKER_SERVER_H#include "common.h"class SocketServer { public:SocketServer(void);~SocketServer(void);public:int StartServer(int port = 5001,int threadNum = 10);void StopServer(); static void S_WorkService(void* arg);void WorkService(int epollfd); private:int make_socket_nonblock(int sock); int make_socket_reuseable(int sock);int make_socket_timeout(int sock,int time); int make_socket_buffsize(int sock,int size); int add_socket_epoll(int epollfd ,int socket,bool oneshot);int reset_socket_epoll(int epollfd ,int socket);int del_socket_epoll(int epollfd ,int socket);private:bool m_bstop; //是否關閉int m_nport; //端口號int m_epollfd; //主線程監聽epoll 句柄int m_bindsocket; //監聽的socketint m_nThreadNum; //接收數據工作線程的數量std::vector<int> m_EpollVec ; //工作線程的epoll句柄集 };typedef singleton<SocketServer> SOCKETServer;#endif總結
以上是生活随笔為你收集整理的高效的半同步/半异步模式的实现的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: .net 调用java service
- 下一篇: git中常用命令小结