WinSock I/O 模型 -- IOCP 模型
前言
IOCP 全稱 Input/Ouput Completion Ports,中文中翻譯一般為“完成端口”,本文中我們使用 IOCP 簡寫.
IOCP 模型是迄今為止最為復雜的一種 I/O 模型,但是同時通過使用 IOCP 我們往往可以達到最佳的系統性能. 當你的網絡應用程序需要管理大量的 Socket I/O 請求時,你或許沒有其他的選擇.
本篇文章,我們將通過一個官方的 IOCP demo 程序來介紹如何使用 IOCP. 因為其復雜性,這篇文章中我們主要介紹如何使用,不深入內部的實現,更多的詳細信息,請參考官方文檔.
官方程序的地址:
https://github.com/microsoft/Windows-classic-samples/tree/master/Samples/Win7Samples/netds/winsock/iocp/serverex
個人感覺官方的 demo 代碼不太好看(包括格式,和一些額外瑣碎的可省略的細節),因此,文末我會附上自己精簡過的代碼,以便讀者閱讀. 讀者按需自取.
API 基礎
關于我們將要使用的數據結構:
- OVERLAPPED 結構體
- WSAEvent
- CriticalSection
- CreateThread
等相關知識,在 WinSocket I/O 模型的相關文章 WinSock I/O 模型 – OVERLAPPED I/O 模型 中均已介紹過,這里不在贅述.
CreateIoCompletionPort
CreateIoCompletionPort 方法用于創建一個 IOCP handle 或者將現有的 Socket handle 與已經創建的 IOCP 關聯起來.
HANDLE WINAPI CreateIoCompletionPort(_In_ HANDLE FileHandle,_In_opt_ HANDLE ExistingCompletionPort,_In_ ULONG_PTR CompletionKey,_In_ DWORD NumberOfConcurrentThreads );GetQueuedCompletionStatus
GetQueuedCompletionStatus 方法用于從指定的 IOCP 實例上獲取 I/O completion packet.
I/O completion packet:通縮來講,當我們創建一個 IOCP實例之后,系統內部會給對應的 IOCP 實例分配一個隊列,這個隊列用戶保存所有與當前 IOCP 關聯起來的 FileHandle 上已經完成的異步任務的信息。我們將這樣的保存這個隊列中的已完成的異步任務的信息稱作 I/O completion packet.
使用這個 API 可以從該隊列中取出這些 I/O completion packet. 注意這是一個隊列, 意味著即使有多個線程同時從一個 IOCP 實例上獲取 I/O completion packet 時,他們也不會獲取到相同的 I/O completion packet,
還有一個更高級的方法: GetQueuedCompletionStatusEx,這里我們沒有使用它,暫且不提.
BOOL GetQueuedCompletionStatus(HANDLE CompletionPort,LPDWORD lpNumberOfBytesTransferred,PULONG_PTR lpCompletionKey,LPOVERLAPPED *lpOverlapped,DWORD dwMilliseconds );返回值:
當該方法成功的獲取到一個 I/O completion packet 時,該方法會返回 TRUE。 此時,lpNumberOfBytes,lpOverlapped, lpCompletionKey 會被填充上與當前 I/O completion packet 對應的數據結構.
當該方法調用失敗時,該方法會返回 FALSE。此時 lpNumberOfBytes,lpOverlapped, lpCompletionKey 的可能返回值如下:
- lpOverlapped 返回參數是 NULL, 代表我們沒有從 IOCP 實例上獲取到任何異步任務的完成信息. lpNumberOfBytes, lpCompletionKey 也不包含任何有效信息.
- lpOverlapped 返回參數不為 NULL, 代表我們從 IOCP 實例上獲取到了異步任務的信息. 這種情況下,該異步任務發生了錯誤, lpNumberOfBytes,lpOverlapped, lpCompletionKey 返回參數上保存這個失敗的任務的信息。 詳細的錯誤信息需要使用 GetLastError.來獲取.
當該方法返回 FALSE,且 lpOverlapped 是 NULL, GetLastError 返回 ERROR_ABANDONED_WAIT_0, 代表當前 IOCP 實例被關閉.
HasOverlappedIoCompleted
HasOverlappedIoCompleted 是一個宏,這個宏用來查詢在當前 IOCP 實例上是否有正在執行的異步任務.
void HasOverlappedIoCompleted(lpOverlapped );lpOverlapped 返回參數表示當前處于 Pending 狀態的異步任務所關聯的 OVERLAPPED 結構體.
如果你的異步任務不處于 ERROR_IO_PENDING, 在這種情況下,不要使用該宏
我們已經直到如何創建一個 IOCP 實例,以及如何得到異步任務完成的通知,我們接下來看看如何提交一個異步任務。
注意,我們將只關注這些 API 與 IOCP 搭配使用,不再提及他們支持的其他操作.
AcceptEx
AcceptEx 方法用來接收新連接.
BOOL AcceptEx(SOCKET sListenSocket,SOCKET sAcceptSocket,PVOID lpOutputBuffer,DWORD dwReceiveDataLength,DWORD dwLocalAddressLength,DWORD dwRemoteAddressLength,LPDWORD lpdwBytesReceived,LPOVERLAPPED lpOverlapped );返回值:
- 當該方法調用立馬成功時,該方法返回 TRUE.
- 當該方法沒有立馬成功時,該方法返回 FALSE。 此時應該使用 WSAGetLastError 獲取具體的錯誤信息. 如果 WSAGetLastError 返回 ERROR_IO_PENDING,代表該接收任務已經提交成功,當前正在進行中.
值得一提的是: 官方文檔中明確表明,該方法的性能遠遠高于 accept 方法。
WSARecv
WSARecv 用于從一個處于連接狀態的 Socket 上接收數據.
int WSAAPI WSARecv(SOCKET s,LPWSABUF lpBuffers,DWORD dwBufferCount,LPDWORD lpNumberOfBytesRecvd,LPDWORD lpFlags,LPWSAOVERLAPPED lpOverlapped,LPWSAOVERLAPPED_COMPLETION_ROUTINE lpCompletionRoutine );這里的 lpOverlapped 參數同 AcceptEx 方法中的 lpOverlapped 參數.
dwBuffers 用于指定一個用于保存接收到的數據的 buffer的數組。 dwBufferCount 指定 buffer 數組中的 buffer 數量。
lpNumberOfBytesRecvd:如果當前讀操作立馬完成,這個參數用于保存接收到的數據長度. 如果當前任務沒有立即完成,而是處于 pending狀態,那個這個參數的值無效.
lpCompletionRoutine: 本例中,我們不適用這個參數,因此指定為空。 我們使用 GetQueuedCompletionStatus 方法來異步的獲取該接收任務完成的通知.
WSASend
WSASend 用于從一個處于連接狀態的 Socket 上發送數據.
int WSAAPI WSASend(SOCKET s,LPWSABUF lpBuffers,DWORD dwBufferCount,LPDWORD lpNumberOfBytesSent,DWORD dwFlags,LPWSAOVERLAPPED lpOverlapped,LPWSAOVERLAPPED_COMPLETION_ROUTINE lpCompletionRoutine );這個方法幾乎和 WSARecv 相同,不再贅述。
實現思路
這個流程說起來是非常簡單,但是簡單的流程中隱藏了極多的細節,這里我們來詳細描述一下我們這個 IOCP服務器的實現思路:
這里,我們看看實例代碼中作為 lpCompletionKey 的結構是什么樣子的:
typedef struct _PER_SOCKET_CONTEXT {SOCKET Socket;LPFN_ACCEPTEX fnAcceptEx;PPER_IO_CONTEXT pIOContext; struct _PER_SOCKET_CONTEXT *pCtxtBack; struct _PER_SOCKET_CONTEXT *pCtxtForward; } PER_SOCKET_CONTEXT, *PPER_SOCKET_CONTEXT;Socket 字段: 當一個任務完成時,我們需要直到是哪個 socket 上的任務完成了,而 GetQueuedCompletionStatus 的返回值中并沒有這個信息,因此我們需要自己保存。
fnAcceptEx: 這個字段的存在是因為 AcceptEx 方法的特殊性決定的。 我們無法直接調用 AcceptEx 方法,而是需要先通過 WSAIoctl 搭配 SIO_GET_EXTENSION_FUNCTION_POINTER 這個參數來動態的獲取該方法的指針。 并且該方法指針是和對應的 Server socket 綁定的,也就是如果你有多個 server socket,那么這個函數指針也會有多個。 因此,這個字段不得不存儲起來
pIOContext:這個字段用于保存在當前 socket 上執行異步任務需要使用的 Overlapped 結構體的數據。 (接下來,我們會更加詳細來說這個結構)
pCtxBack 和 pCtxForward:這個真的不是必須的,如果你使用其他方式維護多個 _PER_SOCKET_CONTEXT 數據結構,那個兩個字段完全不需要.
在將 Server socket 和 IOCP 綁定之后,我們需要啟用其他線程使用 GetQueuedCompletionStatus 來處理完成的異步任務。這里需要斟酌的點是? 我們需要使用幾個線程,這些線程是應該的阻塞的等待還是使用 timeout 來一輪詢的方式等待,這需要讀者自己好好斟酌。
將 server socket 和 IOCP 實例關聯起來之后, 處理任務完成通知的線程也有了,我們如何讓 server socket 開始接收新的連接呢 ?使用 accept ? 不,這里我們不是用它,它是阻塞的方式,這里我們用 AcceptEx 來異步的接收新連接。 那么我們如何做呢?
要使用 AcceptEx,非常重要的一點是,我們得先有個 Overlapped 結構體. 直接創建一個 Overlapped 結構體實例使用好不好? 也不能說不好,但是就目前看到的 IOCP 實現中,沒有人這樣玩兒(本人看過兩個 IOCP 的實現,不包括微軟的官方demo,報錯 libuv)。
目前,他們使用的方法都是將 Overlapped 數據結構包進另外一個結構體。 demo 中的結構體如下:
typedef struct _PER_IO_CONTEXT {WSAOVERLAPPED Overlapped;char Buffer[MAX_BUFF_SIZE];WSABUF wsabuf;int nTotalBytes;int nSentBytes;IO_OPERATION IOOperation;SOCKET SocketAccept; struct _PER_IO_CONTEXT *pIOContextForward; } PER_IO_CONTEXT, *PPER_IO_CONTEXT;注意,這個 _PER_IO_CONTEXT 包含在 _PER_SOCKET_CONTEXT(也就是我們 lpCompletionKey) 這個結構體中。
Overlapped: 這個字段自然是必須存在的.
IOOperation: 指明我們當前異步任務的類型,它的類型 IO_OPERATION: accept, send, read
SocketAccept: 如果我們當前異步任務是一個 accept 任務,那個這個字段用來存儲我們新接收到的 socket 實例
wsaBuf: 這個字段是我們提交讀或者寫任務是需要傳給 WSARecv 或 WSARead 的一個數據結構。
Buffer 是我們真正用來存儲數據的地方。 WSABuf 這個結構中只包含一個 buffer 的指針,和這個buffer 的長度。這個 demo 中這樣設計,那么毫無疑問, WSABuf 中的 buffer 指針必然指向 Buffer。 發送或接收到的數據都需要存在這兒
nTotalbytes, nSentBytes 用來存儲要發送或者接收到的數據長度
pIOcontextForward: 這個字段存在的是因為: 我們將一個 Socket 與 _PER_SOCKET_CONTEXT 關聯,而一個 _PER_SOCKET_CONTEXT 中僅僅包含一個 _PER_IO_CONTEXT(也就是 Overlapped 結構),那么如何應對在一個socket 上進行多個異步任務的場景呢? 此時就需要多個 _PER_IO_CONTEXT 實例了,此時這個鏈表就發揮作用了。
這里唯一值得注意的是: Overlapped結構體放在 _PER_IO_CONTEXT 第一個字段,它的好處是,在我們使用 GetQueuedCompletionStatus 獲取到當前完成的異步任務的 lpOverlapped 參數時,我們可以直接將該指針強轉為 _PER_IO_CONTEXT, 這樣我們便能直到當前具體的 I/O 操作是什么。 而 _PER_SOCKET_CONTEXT 這個結構會作為 lpCompletionKey 被GetQueuedCompletionStatus 返回,此時我們便有了當前 Socket 所有的上下文.
這種設計下, 一個 _PER_IO_CONTEXT 結構便 對應一個異步任務,如果一個 socket 有多個異步任務,那么便需要有多個 _PER_IO_CONTEXT 結構.
至于這個 demo 中,對于這個結構體的設計,在實際使用中,有很多需要斟酌的地方。
到了這里,我們使用 WSARecv 和 WSASend 也就不難了。
實例
代碼較多,細細品味
// THIS CODE AND INFORMATION IS PROVIDED "AS IS" WITHOUT WARRANTY OF // ANY KIND, EITHER EXPRESSED OR IMPLIED, INCLUDING BUT NOT LIMITED TO // THE IMPLIED WARRANTIES OF MERCHANTABILITY AND/OR FITNESS FOR A // PARTICULAR PURPOSE. // // Copyright (C) Microsoft Corporation. All Rights Reserved. //#pragma warning (disable:4127) #pragma comment(lib,"ws2_32.lib")#include <winsock2.h> #include <mswsock.h> #include <Ws2tcpip.h> #include <stdio.h> #include <stdlib.h> #include <strsafe.h>#define DEFAULT_PORT "5001" #define MAX_BUFF_SIZE 8192 #define MAX_WORKER_THREAD 16#define xmalloc(s) HeapAlloc(GetProcessHeap(), HEAP_ZERO_MEMORY, (s)) #define xfree(p) HeapFree(GetProcessHeap(), 0, (p))typedef enum _IO_OPERATION {ClientIoAccept,ClientIoRead,ClientIoWrite } IO_OPERATION, *PIO_OPERATION;typedef struct _PER_IO_CONTEXT {WSAOVERLAPPED Overlapped;char Buffer[MAX_BUFF_SIZE];WSABUF wsabuf;int nTotalBytes;int nSentBytes;IO_OPERATION IOOperation;SOCKET SocketAccept; struct _PER_IO_CONTEXT *pIOContextForward; } PER_IO_CONTEXT, *PPER_IO_CONTEXT;// 作為 lpCompletionKey 使用 // 每個 socket 對應一個 _PER_SOCKET_CONTEXT 結構 // 該 socket 上的異步任務信息存儲在 pIoContext 中,該結構中是一個鏈表,因此 pIoContext 應當被當作一個動態數組來看待 typedef struct _PER_SOCKET_CONTEXT {SOCKET Socket;LPFN_ACCEPTEX fnAcceptEx;PPER_IO_CONTEXT pIOContext; struct _PER_SOCKET_CONTEXT *pCtxtBack; struct _PER_SOCKET_CONTEXT *pCtxtForward; } PER_SOCKET_CONTEXT, *PPER_SOCKET_CONTEXT;BOOL CreateListenSocket(void); BOOL CreateAcceptSocket(BOOL fUpdateIOCP); DWORD WINAPI WorkerThread(LPVOID WorkContext);PPER_SOCKET_CONTEXT UpdateCompletionPort(SOCKET s, IO_OPERATION ClientIo, BOOL bAddToList);PPER_SOCKET_CONTEXT CtxtAllocate(SOCKET s, IO_OPERATION ClientIO); VOID CloseClient(PPER_SOCKET_CONTEXT lpPerSocketContext, BOOL bGraceful); VOID CtxtListFree(); VOID CtxtListAddTo(PPER_SOCKET_CONTEXT lpPerSocketContext); VOID CtxtListDeleteFrom(PPER_SOCKET_CONTEXT lpPerSocketContext);BOOL g_bEndServer = FALSE; BOOL g_bRestart = TRUE; HANDLE g_hIOCP = INVALID_HANDLE_VALUE; SOCKET g_sdListen = INVALID_SOCKET; HANDLE g_ThreadHandles[MAX_WORKER_THREAD]; WSAEVENT g_hCleanupEvent[1]; PPER_SOCKET_CONTEXT g_pCtxtListenSocket = NULL; PPER_SOCKET_CONTEXT g_pCtxtList = NULL; CRITICAL_SECTION g_CriticalSection;int myprintf(const char *lpFormat, ...);void main() {SYSTEM_INFO systemInfo;WSADATA wsaData;DWORD dwThreadCount = 0;int nRet = 0;HANDLE hThread;DWORD dwThreadId;g_ThreadHandles[0] = (HANDLE)WSA_INVALID_EVENT;for (int i = 0; i < MAX_WORKER_THREAD; i++) {g_ThreadHandles[i] = INVALID_HANDLE_VALUE;}GetSystemInfo(&systemInfo);dwThreadCount = systemInfo.dwNumberOfProcessors * 2;if (WSA_INVALID_EVENT == (g_hCleanupEvent[0] = WSACreateEvent())) {myprintf("WSACreateEvent() failed: %d\n", WSAGetLastError());return;}if ((nRet = WSAStartup(0x202, &wsaData)) != 0) {myprintf("WSAStartup() failed: %d\n",nRet);if(g_hCleanupEvent[0] != WSA_INVALID_EVENT) {WSACloseEvent(g_hCleanupEvent[0]);g_hCleanupEvent[0] = WSA_INVALID_EVENT;}return;}InitializeCriticalSection(&g_CriticalSection);while (g_bRestart) {g_bRestart = FALSE;g_bEndServer = FALSE;WSAResetEvent(g_hCleanupEvent[0]);// 創建 IOCP 實例g_hIOCP = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);if (g_hIOCP == NULL) {myprintf("CreateIoCompletionPort() failed to create I/O completion port: %d\n", GetLastError());goto done;}// 啟用 worker 線程來處理異步任務完成的通知for (DWORD dwCPU=0; dwCPU<dwThreadCount; dwCPU++) {// Create worker threads to service the overlapped I/O requests. The decision// to create 2 worker threads per CPU in the system is a heuristic. Also,// note that thread handles are closed right away, because we will not need them// and the worker threads will continue to execute.hThread = CreateThread(NULL, 0, WorkerThread, g_hIOCP, 0, &dwThreadId);if (hThread == NULL) {myprintf("CreateThread() failed to create worker thread: %d\n", GetLastError());goto done;}g_ThreadHandles[dwCPU] = hThread;hThread = INVALID_HANDLE_VALUE;}if (!CreateListenSocket())goto done;// 提交 accept 任務if (!CreateAcceptSocket(TRUE))goto done;// 阻塞主線程,直到服務器退出WSAWaitForMultipleEvents(1, g_hCleanupEvent, TRUE, WSA_INFINITE, FALSE);done:// 當服務器退出時,做一些清理工作g_bEndServer = TRUE;// Cause worker threads to exit// 因為我們在子線程中調用 GetQueuedCompletionStatus 使用的timeout 值為 INFINITE, // 我們需要手動的 post 一個 I/O completion packet 到 IOCP 實例上,以便子線程中的 // GetQueuedCompletionStatus 讀取到我們手動 post 的任務完成通知而退出,// 不致于子線程用于無法退出if (g_hIOCP) {for (DWORD i = 0; i < dwThreadCount; i++) {PostQueuedCompletionStatus(g_hIOCP, 0, 0, NULL);}}// Make sure worker threads exits.if (WAIT_OBJECT_0 != WaitForMultipleObjects(dwThreadCount, g_ThreadHandles, TRUE, 1000)) {myprintf("WaitForMultipleObjects() failed: %d\n", GetLastError());} else {for (DWORD i=0; i<dwThreadCount; i++) {if (g_ThreadHandles[i] != INVALID_HANDLE_VALUE)CloseHandle(g_ThreadHandles[i]);g_ThreadHandles[i] = INVALID_HANDLE_VALUE;}}if (g_sdListen != INVALID_SOCKET) {closesocket(g_sdListen);g_sdListen = INVALID_SOCKET;}if (g_pCtxtListenSocket) {// 如果當前 Server socket 上還有正在進行的異步任務,等待它完成,再清理while (!HasOverlappedIoCompleted((LPOVERLAPPED)&g_pCtxtListenSocket->pIOContext->Overlapped))Sleep(0);if (g_pCtxtListenSocket->pIOContext->SocketAccept != INVALID_SOCKET)closesocket(g_pCtxtListenSocket->pIOContext->SocketAccept);g_pCtxtListenSocket->pIOContext->SocketAccept = INVALID_SOCKET;if (g_pCtxtListenSocket->pIOContext)xfree(g_pCtxtListenSocket->pIOContext);if (g_pCtxtListenSocket)xfree(g_pCtxtListenSocket);g_pCtxtListenSocket = NULL;}CtxtListFree();if (g_hIOCP) {CloseHandle(g_hIOCP);g_hIOCP = NULL;}} //while (g_bRestart)DeleteCriticalSection(&g_CriticalSection);if (g_hCleanupEvent[0] != WSA_INVALID_EVENT) {WSACloseEvent(g_hCleanupEvent[0]);g_hCleanupEvent[0] = WSA_INVALID_EVENT;}WSACleanup(); } //mainSOCKET CreateSocket() {int nRet = 0;int nZero = 0;SOCKET sdSocket = INVALID_SOCKET;sdSocket = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_IP, NULL, 0, WSA_FLAG_OVERLAPPED); if (sdSocket == INVALID_SOCKET) {myprintf("WSASocket(sdSocket) failed: %d\n", WSAGetLastError());return(sdSocket);}//// Disable send buffering on the socket. Setting SO_SNDBUF// to 0 causes winsock to stop buffering sends and perform// sends directly from our buffers, thereby save one memory copy.//// However, this does prevent the socket from ever filling the// send pipeline. This can lead to packets being sent that are// not full (i.e. the overhead of the IP and TCP headers is // great compared to the amount of data being carried).//// Disabling the send buffer has less serious repercussions // than disabling the receive buffer.//nZero = 0;nRet = setsockopt(sdSocket, SOL_SOCKET, SO_SNDBUF, (char *)&nZero, sizeof(nZero));if (nRet == SOCKET_ERROR) {myprintf("setsockopt(SNDBUF) failed: %d\n", WSAGetLastError());return(sdSocket);}//// Don't disable receive buffering. This will cause poor network// performance since if no receive is posted and no receive buffers,// the TCP stack will set the window size to zero and the peer will// no longer be allowed to send data.//// // Do not set a linger value...especially don't set it to an abortive// close. If you set abortive close and there happens to be a bit of// data remaining to be transfered (or data that has not been // acknowledged by the peer), the connection will be forcefully reset// and will lead to a loss of data (i.e. the peer won't get the last// bit of data). This is BAD. If you are worried about malicious// clients connecting and then not sending or receiving, the server// should maintain a timer on each connection. If after some point,// the server deems a connection is "stale" it can then set linger// to be abortive and close the connection.///*LINGER lingerStruct;lingerStruct.l_onoff = 1;lingerStruct.l_linger = 0;nRet = setsockopt(sdSocket, SOL_SOCKET, SO_LINGER,(char *)&lingerStruct, sizeof(lingerStruct));if( nRet == SOCKET_ERROR ) {myprintf("setsockopt(SO_LINGER) failed: %d\n", WSAGetLastError());return(sdSocket);}*/return(sdSocket); }BOOL CreateListenSocket(void) {int nRet = 0;LINGER lingerStruct;struct addrinfo hints = {0};struct addrinfo *addrlocal = NULL;lingerStruct.l_onoff = 1;lingerStruct.l_linger = 0;hints.ai_flags = AI_PASSIVE;hints.ai_family = AF_INET;hints.ai_socktype = SOCK_STREAM;hints.ai_protocol = IPPROTO_IP;if (getaddrinfo(NULL, DEFAULT_PORT, &hints, &addrlocal) != 0) {myprintf("getaddrinfo() failed with error %d\n", WSAGetLastError());return FALSE;}if (addrlocal == NULL) {myprintf("getaddrinfo() failed to resolve/convert the interface\n");return FALSE;}g_sdListen = CreateSocket();if (g_sdListen == INVALID_SOCKET) {freeaddrinfo(addrlocal);return FALSE;}nRet = bind(g_sdListen, addrlocal->ai_addr, (int) addrlocal->ai_addrlen);if (nRet == SOCKET_ERROR) {myprintf("bind() failed: %d\n", WSAGetLastError());freeaddrinfo(addrlocal);return FALSE;}nRet = listen(g_sdListen, 5);if (nRet == SOCKET_ERROR) {myprintf("listen() failed: %d\n", WSAGetLastError());freeaddrinfo(addrlocal);return FALSE;}freeaddrinfo(addrlocal);return TRUE; }// // Create a socket and invoke AcceptEx. Only the original call to to this // function needs to be added to the IOCP. // // If the expected behaviour of connecting client applications is to NOT // send data right away, then only posting one AcceptEx can cause connection // attempts to be refused if a client connects without sending some initial // data (notice that the associated iocpclient does not operate this way // but instead makes a connection and starts sending data write away). // This is because the IOCP packet does not get delivered without the initial // data (as implemented in this sample) thus preventing the worker thread // from posting another AcceptEx and eventually the backlog value set in // listen() will be exceeded if clients continue to try to connect. // // One technique to address this situation is to simply cause AcceptEx // to return right away upon accepting a connection without returning any // data. This can be done by setting dwReceiveDataLength=0 when calling AcceptEx. // // Another technique to address this situation is to post multiple calls // to AcceptEx. Posting multiple calls to AcceptEx is similar in concept to // increasing the backlog value in listen(), though posting AcceptEx is // dynamic (i.e. during the course of running your application you can adjust // the number of AcceptEx calls you post). It is important however to keep // your backlog value in listen() high in your server to ensure that the // stack can accept connections even if your application does not get enough // CPU cycles to repost another AcceptEx under stress conditions. // // This sample implements neither of these techniques and is therefore // susceptible to the behaviour described above. // BOOL CreateAcceptSocket(BOOL fUpdateIOCP) {int nRet = 0;DWORD dwRecvNumBytes = 0;DWORD bytes = 0;GUID acceptex_guid = WSAID_ACCEPTEX;//The context for listening socket uses the SockAccept member to store the//socket for client connection. if (fUpdateIOCP) {g_pCtxtListenSocket = UpdateCompletionPort(g_sdListen, ClientIoAccept, FALSE);if (g_pCtxtListenSocket == NULL) {myprintf("failed to update listen socket to IOCP\n");return FALSE;}// 動態獲取 AcceptEx 方法的函數指針// 將它保存再對應 Socket context 上nRet = WSAIoctl(g_sdListen,SIO_GET_EXTENSION_FUNCTION_POINTER,&acceptex_guid,sizeof(acceptex_guid),&g_pCtxtListenSocket->fnAcceptEx,sizeof(g_pCtxtListenSocket->fnAcceptEx),&bytes,NULL,NULL);if (nRet == SOCKET_ERROR) {myprintf("failed to load AcceptEx: %d\n", WSAGetLastError());return FALSE;}}g_pCtxtListenSocket->pIOContext->SocketAccept = CreateSocket();if (g_pCtxtListenSocket->pIOContext->SocketAccept == INVALID_SOCKET) {myprintf("failed to create new accept socket\n");return FALSE;}// 提交接收任務// 這里,我們期待接收 socket 的同時從該 socket 上 接收一塊兒數據nRet = g_pCtxtListenSocket->fnAcceptEx(g_sdListen, g_pCtxtListenSocket->pIOContext->SocketAccept,(LPVOID)(g_pCtxtListenSocket->pIOContext->Buffer),MAX_BUFF_SIZE - (2 * (sizeof(SOCKADDR_STORAGE) + 16)),sizeof(SOCKADDR_STORAGE) + 16, sizeof(SOCKADDR_STORAGE) + 16,&dwRecvNumBytes,(LPOVERLAPPED) &(g_pCtxtListenSocket->pIOContext->Overlapped));if (nRet == SOCKET_ERROR && (ERROR_IO_PENDING != WSAGetLastError())) {myprintf("AcceptEx() failed: %d\n", WSAGetLastError());return FALSE;}return TRUE; }DWORD WINAPI WorkerThread (LPVOID WorkThreadContext) {HANDLE hIOCP = (HANDLE)WorkThreadContext;BOOL bSuccess = FALSE;int nRet = 0;LPWSAOVERLAPPED lpOverlapped = NULL;PPER_SOCKET_CONTEXT lpPerSocketContext = NULL;PPER_SOCKET_CONTEXT lpAcceptSocketContext = NULL;PPER_IO_CONTEXT lpIOContext = NULL; WSABUF buffRecv;WSABUF buffSend;DWORD dwRecvNumBytes = 0;DWORD dwSendNumBytes = 0;DWORD dwFlags = 0;DWORD dwIoSize = 0;HRESULT hRet;while (TRUE) {// 阻塞的等待有異步任務完成的通知到來// 如果沒有,一直等待bSuccess = GetQueuedCompletionStatus(hIOCP,&dwIoSize,(PDWORD_PTR)&lpPerSocketContext,(LPOVERLAPPED *)&lpOverlapped,INFINITE );if (!bSuccess)myprintf("GetQueuedCompletionStatus() failed: %d\n", GetLastError());// 當服務器退出時,我們使用 PostQueuedCompletionStatus post 的消息會觸發這個 case// 我們當前子線程便可以正常退出了if (lpPerSocketContext == NULL) {return 0;}if (g_bEndServer) {return 0;}lpIOContext = (PPER_IO_CONTEXT)lpOverlapped;////We should never skip the loop and not post another AcceptEx if the current//completion packet is for previous AcceptEx//if (lpIOContext->IOOperation != ClientIoAccept) {if (!bSuccess || (bSuccess && (0 == dwIoSize))) {CloseClient(lpPerSocketContext, FALSE); continue;}}//// determine what type of IO packet has completed by checking the PER_IO_CONTEXT // associated with this socket. This will determine what action to take.//switch (lpIOContext->IOOperation) {case ClientIoAccept://// When the AcceptEx function returns, the socket sAcceptSocket is // in the default state for a connected socket. The socket sAcceptSocket // does not inherit the properties of the socket associated with // sListenSocket parameter until SO_UPDATE_ACCEPT_CONTEXT is set on // the socket. Use the setsockopt function to set the SO_UPDATE_ACCEPT_CONTEXT // option, specifying sAcceptSocket as the socket handle and sListenSocket // as the option value. //nRet = setsockopt(lpPerSocketContext->pIOContext->SocketAccept, SOL_SOCKET,SO_UPDATE_ACCEPT_CONTEXT,(char *)&g_sdListen,sizeof(g_sdListen));if (nRet == SOCKET_ERROR) {////just warn user here.//myprintf("setsockopt(SO_UPDATE_ACCEPT_CONTEXT) failed to update accept socket\n");WSASetEvent(g_hCleanupEvent[0]);return 0;}lpAcceptSocketContext = UpdateCompletionPort(lpPerSocketContext->pIOContext->SocketAccept, ClientIoAccept, TRUE);if (lpAcceptSocketContext == NULL) {////just warn user here.//myprintf("failed to update accept socket to IOCP\n");WSASetEvent(g_hCleanupEvent[0]);return 0;}if (dwIoSize) {lpAcceptSocketContext->pIOContext->IOOperation = ClientIoWrite;lpAcceptSocketContext->pIOContext->nTotalBytes = dwIoSize;lpAcceptSocketContext->pIOContext->nSentBytes = 0;lpAcceptSocketContext->pIOContext->wsabuf.len = dwIoSize;hRet = StringCbCopyNA(lpAcceptSocketContext->pIOContext->Buffer,MAX_BUFF_SIZE,lpPerSocketContext->pIOContext->Buffer,sizeof(lpPerSocketContext->pIOContext->Buffer));lpAcceptSocketContext->pIOContext->wsabuf.buf = lpAcceptSocketContext->pIOContext->Buffer;nRet = WSASend(lpPerSocketContext->pIOContext->SocketAccept,&lpAcceptSocketContext->pIOContext->wsabuf, 1,&dwSendNumBytes,0,&(lpAcceptSocketContext->pIOContext->Overlapped), NULL);if (nRet == SOCKET_ERROR && (ERROR_IO_PENDING != WSAGetLastError())) {myprintf ("WSASend() failed: %d\n", WSAGetLastError());CloseClient(lpAcceptSocketContext, FALSE);} else {myprintf("WorkerThread %d: Socket(%d) AcceptEx completed (%d bytes), Send posted\n", GetCurrentThreadId(), lpPerSocketContext->Socket, dwIoSize);}} else {//// AcceptEx completes but doesn't read any data so we need to post// an outstanding overlapped read.//lpAcceptSocketContext->pIOContext->IOOperation = ClientIoRead;dwRecvNumBytes = 0;dwFlags = 0;buffRecv.buf = lpAcceptSocketContext->pIOContext->Buffer,buffRecv.len = MAX_BUFF_SIZE;nRet = WSARecv(lpAcceptSocketContext->Socket,&buffRecv, 1,&dwRecvNumBytes,&dwFlags,&lpAcceptSocketContext->pIOContext->Overlapped, NULL);if (nRet == SOCKET_ERROR && (ERROR_IO_PENDING != WSAGetLastError())) {myprintf ("WSARecv() failed: %d\n", WSAGetLastError());CloseClient(lpAcceptSocketContext, FALSE);}}////Time to post another outstanding AcceptEx//if (!CreateAcceptSocket(FALSE)) {myprintf("Please shut down and reboot the server.\n");WSASetEvent(g_hCleanupEvent[0]);return(0);}break;case ClientIoRead://// a read operation has completed, post a write operation to echo the// data back to the client using the same data buffer.//lpIOContext->IOOperation = ClientIoWrite;lpIOContext->nTotalBytes = dwIoSize;lpIOContext->nSentBytes = 0;lpIOContext->wsabuf.len = dwIoSize;dwFlags = 0;nRet = WSASend(lpPerSocketContext->Socket,&lpIOContext->wsabuf, 1, &dwSendNumBytes,dwFlags,&(lpIOContext->Overlapped), NULL);if (nRet == SOCKET_ERROR && (ERROR_IO_PENDING != WSAGetLastError())) {myprintf("WSASend() failed: %d\n", WSAGetLastError());CloseClient(lpPerSocketContext, FALSE);} else {myprintf("WorkerThread %d: Socket(%d) Recv completed (%d bytes), Send posted\n", GetCurrentThreadId(), lpPerSocketContext->Socket, dwIoSize);}break;case ClientIoWrite://// a write operation has completed, determine if all the data intended to be// sent actually was sent.//lpIOContext->IOOperation = ClientIoWrite;lpIOContext->nSentBytes += dwIoSize;dwFlags = 0;if (lpIOContext->nSentBytes < lpIOContext->nTotalBytes) {//// the previous write operation didn't send all the data,// post another send to complete the operation//buffSend.buf = lpIOContext->Buffer + lpIOContext->nSentBytes;buffSend.len = lpIOContext->nTotalBytes - lpIOContext->nSentBytes;nRet = WSASend (lpPerSocketContext->Socket,&buffSend,1, &dwSendNumBytes,dwFlags,&(lpIOContext->Overlapped), NULL);if (nRet == SOCKET_ERROR && (ERROR_IO_PENDING != WSAGetLastError())) {myprintf ("WSASend() failed: %d\n", WSAGetLastError());CloseClient(lpPerSocketContext, FALSE);} else {myprintf("WorkerThread %d: Socket(%d) Send partially completed (%d bytes), Recv posted\n", GetCurrentThreadId(), lpPerSocketContext->Socket, dwIoSize);}} else {//// previous write operation completed for this socket, post another recv//lpIOContext->IOOperation = ClientIoRead; dwRecvNumBytes = 0;dwFlags = 0;buffRecv.buf = lpIOContext->Buffer,buffRecv.len = MAX_BUFF_SIZE;nRet = WSARecv(lpPerSocketContext->Socket,&buffRecv, 1, &dwRecvNumBytes,&dwFlags,&lpIOContext->Overlapped, NULL);if (nRet == SOCKET_ERROR && (ERROR_IO_PENDING != WSAGetLastError())) {myprintf ("WSARecv() failed: %d\n", WSAGetLastError());CloseClient(lpPerSocketContext, FALSE);} else {myprintf("WorkerThread %d: Socket(%d) Send completed (%d bytes), Recv posted\n", GetCurrentThreadId(), lpPerSocketContext->Socket, dwIoSize);}}break;} //switch} //whilereturn 0; } // // Allocate a context structures for the socket and add the socket to the IOCP. // Additionally, add the context structure to the global list of context structures. // PPER_SOCKET_CONTEXT UpdateCompletionPort(SOCKET sd, IO_OPERATION ClientIo, BOOL bAddToList) {PPER_SOCKET_CONTEXT lpPerSocketContext;lpPerSocketContext = CtxtAllocate(sd, ClientIo);if (lpPerSocketContext == NULL)return NULL;g_hIOCP = CreateIoCompletionPort((HANDLE)sd, g_hIOCP, (DWORD_PTR)lpPerSocketContext, 0);if (g_hIOCP == NULL) {myprintf("CreateIoCompletionPort() failed: %d\n", GetLastError());if( lpPerSocketContext->pIOContext )xfree(lpPerSocketContext->pIOContext);xfree(lpPerSocketContext);return NULL;}////The listening socket context (bAddToList is FALSE) is not added to the list.//All other socket contexts are added to the list.//if (bAddToList) CtxtListAddTo(lpPerSocketContext);myprintf("UpdateCompletionPort: Socket(%d) added to IOCP\n", lpPerSocketContext->Socket);return lpPerSocketContext; }// // Close down a connection with a client. This involves closing the socket (when // initiated as a result of a CTRL-C the socket closure is not graceful). Additionally, // any context data associated with that socket is free'd. // VOID CloseClient (PPER_SOCKET_CONTEXT lpPerSocketContext, BOOL bGraceful) {EnterCriticalSection(&g_CriticalSection);if (lpPerSocketContext) {myprintf("CloseClient: Socket(%d) connection closing (graceful=%s)\n", lpPerSocketContext->Socket, (bGraceful?"TRUE":"FALSE"));if (!bGraceful) {//// force the subsequent closesocket to be abortative.//LINGER lingerStruct;lingerStruct.l_onoff = 1;lingerStruct.l_linger = 0;setsockopt(lpPerSocketContext->Socket, SOL_SOCKET, SO_LINGER, (char *)&lingerStruct, sizeof(lingerStruct));}if (lpPerSocketContext->pIOContext->SocketAccept != INVALID_SOCKET) {closesocket(lpPerSocketContext->pIOContext->SocketAccept);lpPerSocketContext->pIOContext->SocketAccept = INVALID_SOCKET;};closesocket(lpPerSocketContext->Socket);lpPerSocketContext->Socket = INVALID_SOCKET;CtxtListDeleteFrom(lpPerSocketContext);lpPerSocketContext = NULL;} else {myprintf("CloseClient: lpPerSocketContext is NULL\n");}LeaveCriticalSection(&g_CriticalSection);return; } // // Allocate a socket context for the new connection. // PPER_SOCKET_CONTEXT CtxtAllocate(SOCKET sd, IO_OPERATION ClientIO) {PPER_SOCKET_CONTEXT lpPerSocketContext;EnterCriticalSection(&g_CriticalSection);lpPerSocketContext = (PPER_SOCKET_CONTEXT)xmalloc(sizeof(PER_SOCKET_CONTEXT));if (lpPerSocketContext) {lpPerSocketContext->pIOContext = (PPER_IO_CONTEXT)xmalloc(sizeof(PER_IO_CONTEXT));if( lpPerSocketContext->pIOContext ) {lpPerSocketContext->Socket = sd;lpPerSocketContext->pCtxtBack = NULL;lpPerSocketContext->pCtxtForward = NULL;lpPerSocketContext->pIOContext->Overlapped.Internal = 0;lpPerSocketContext->pIOContext->Overlapped.InternalHigh = 0;lpPerSocketContext->pIOContext->Overlapped.Offset = 0;lpPerSocketContext->pIOContext->Overlapped.OffsetHigh = 0;lpPerSocketContext->pIOContext->Overlapped.hEvent = NULL;lpPerSocketContext->pIOContext->IOOperation = ClientIO;lpPerSocketContext->pIOContext->pIOContextForward = NULL;lpPerSocketContext->pIOContext->nTotalBytes = 0;lpPerSocketContext->pIOContext->nSentBytes = 0;lpPerSocketContext->pIOContext->wsabuf.buf = lpPerSocketContext->pIOContext->Buffer;lpPerSocketContext->pIOContext->wsabuf.len = sizeof(lpPerSocketContext->pIOContext->Buffer);lpPerSocketContext->pIOContext->SocketAccept = INVALID_SOCKET;ZeroMemory(lpPerSocketContext->pIOContext->wsabuf.buf, lpPerSocketContext->pIOContext->wsabuf.len);} else {xfree(lpPerSocketContext);myprintf("HeapAlloc() PER_IO_CONTEXT failed: %d\n", GetLastError());}} else {myprintf("HeapAlloc() PER_SOCKET_CONTEXT failed: %d\n", GetLastError());return NULL;}LeaveCriticalSection(&g_CriticalSection);return(lpPerSocketContext); }// // Add a client connection context structure to the global list of context structures. // VOID CtxtListAddTo(PPER_SOCKET_CONTEXT lpPerSocketContext) {PPER_SOCKET_CONTEXT pTemp;EnterCriticalSection(&g_CriticalSection);if (g_pCtxtList == NULL) {//// add the first node to the linked list//lpPerSocketContext->pCtxtBack = NULL;lpPerSocketContext->pCtxtForward = NULL;g_pCtxtList = lpPerSocketContext;} else {//// add node to head of list//pTemp = g_pCtxtList;g_pCtxtList = lpPerSocketContext;lpPerSocketContext->pCtxtBack = pTemp;lpPerSocketContext->pCtxtForward = NULL; pTemp->pCtxtForward = lpPerSocketContext;}LeaveCriticalSection(&g_CriticalSection);return; }// // Remove a client context structure from the global list of context structures. // VOID CtxtListDeleteFrom(PPER_SOCKET_CONTEXT lpPerSocketContext) {PPER_SOCKET_CONTEXT pBack;PPER_SOCKET_CONTEXT pForward;PPER_IO_CONTEXT pNextIO = NULL;PPER_IO_CONTEXT pTempIO = NULL;EnterCriticalSection(&g_CriticalSection);if (lpPerSocketContext) {pBack = lpPerSocketContext->pCtxtBack;pForward = lpPerSocketContext->pCtxtForward;if (pBack == NULL && pForward == NULL) {//// This is the only node in the list to delete//g_pCtxtList = NULL;} else if (pBack == NULL && pForward != NULL) {//// This is the start node in the list to delete//pForward->pCtxtBack = NULL;g_pCtxtList = pForward;} else if (pBack != NULL && pForward == NULL) {//// This is the end node in the list to delete//pBack->pCtxtForward = NULL;} else if (pBack && pForward) {//// Neither start node nor end node in the list//pBack->pCtxtForward = pForward;pForward->pCtxtBack = pBack;}//// Free all i/o context structures per socket//pTempIO = (PPER_IO_CONTEXT)(lpPerSocketContext->pIOContext);do {pNextIO = (PPER_IO_CONTEXT)(pTempIO->pIOContextForward);if (pTempIO) {////The overlapped structure is safe to free when only the posted i/o has//completed. Here we only need to test those posted but not yet received //by PQCS in the shutdown process.//if (g_bEndServer)while (!HasOverlappedIoCompleted((LPOVERLAPPED)pTempIO)) Sleep(0);xfree(pTempIO);pTempIO = NULL;}pTempIO = pNextIO;} while (pNextIO);xfree(lpPerSocketContext);lpPerSocketContext = NULL;} else {myprintf("CtxtListDeleteFrom: lpPerSocketContext is NULL\n");}LeaveCriticalSection(&g_CriticalSection);return; }// // Free all context structure in the global list of context structures. // VOID CtxtListFree() {PPER_SOCKET_CONTEXT pTemp1, pTemp2;EnterCriticalSection(&g_CriticalSection);pTemp1 = g_pCtxtList; while (pTemp1) {pTemp2 = pTemp1->pCtxtBack;CloseClient(pTemp1, FALSE);pTemp1 = pTemp2;}LeaveCriticalSection(&g_CriticalSection);return; }int myprintf(const char *lpFormat, ...) {int nLen = 0;int nRet = 0;char cBuffer[512] ;va_list arglist ;HANDLE hOut = NULL;HRESULT hRet;ZeroMemory(cBuffer, sizeof(cBuffer));va_start(arglist, lpFormat);nLen = lstrlenA(lpFormat) ;hRet = StringCchVPrintfA(cBuffer,512,lpFormat,arglist);if (nRet >= nLen || GetLastError() == 0) {hOut = GetStdHandle(STD_OUTPUT_HANDLE);if (hOut != INVALID_HANDLE_VALUE)WriteConsole( hOut, cBuffer, lstrlenA(cBuffer), (LPDWORD)&nLen, NULL ) ;}return nLen ; }END!!!
創作挑戰賽新人創作獎勵來咯,堅持創作打卡瓜分現金大獎總結
以上是生活随笔為你收集整理的WinSock I/O 模型 -- IOCP 模型的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: HGE2D引擎按键消息分析(续)
- 下一篇: PID控制器改进笔记之五:改进PID控制