《ASCE1885的源码分析》の基于完成端口模型的TCP服务器框架
使用IOCP的TCP服務器使用過程大體如下:
1)? 使用CreateIoCompletionPort函數創建完成端口,并以該I/O完成端口為參數創建多個服務線程;
2) 創建監聽套接字;
3) 接收客戶端連接請求,返回服務套接字;
4) 將服務套接字與完成端口綁定,并在該套接字上投遞初始I/O操作請求;
5) 返回步驟3);
?
服務線程的流程如下:
1)? 調用GetQueuedCompletionPort函數等待獲取完成信息;
2) 根據需要對數據進行處理并投遞后續的I/O操作請求;
3) 返回步驟1)。
?
程序代碼及注釋如下:
#include <stdio.h>
#include <winsock2.h>
#include <process.h>
#include <Windows.h>
?
#pragma comment(lib, "ws2_32.lib")
?
#define MAX_THREAD_NUM 24? //最大服務器線程數
#define MAX_BUF_LEN 5000?? //最大服務器I/O緩沖區大小
?
//枚舉類型,用于指示服務器I/O操作的類型
typedef enum _IO_OPER
{
??? SVR_IO_READ,
??? SVR_IO_WRITE???????
}IO_OPER, *LPIO_OPER;
?
//擴展重疊結構體,單I/O數據
typedef struct _OverLappedEx
{
??? OVERLAPPED OverLapped;
??? WSABUF???? wbuf; //I/O操作的數據對象
??? char?????? data[MAX_BUF_LEN];//實際的數據緩沖區
??? IO_OPER??? oper; //用于標志I/O操作的類型
??? DWORD????? flags; //用于設定或者返回I/O操作的標志
} PER_IO_DATA, *LPPER_IO_DATA;
?
//完成鍵結構體,單句柄數據,對應每個服務套接字---每個連接
typedef struct _CONN_CTX
{
??? SOCKET sockAccept; //該連接的服務器偵聽服務套接字
??? LPPER_IO_DATA pPerIOData; //指向該連接的I/O操作信息
??? struct _CONN_CTX *pPrev; //用于形成服務器當前所有連接信息的雙向鏈表
??? struct _CONN_CTX *pNext; //分別指向鏈表中前一個節點和后一個節點
} CONN_CTX, *LPCONN_CTX;
?
CRITICAL_SECTION g_CriticalSection; //防止對連接信息鏈表的訪問沖突
LPCONN_CTX g_ptrConnCtxHead = NULL; //指向雙向鏈表頭節點,用于對該鏈表的訪問和維護
SOCKET g_sockListen = INVALID_SOCKET;
?
//該函數用于對控制臺消息進行處理,當接收到CTRL_C_EVENT、
//CTRL_LOGOFF_EVENT、?CTRL_SHUTDOWN_EVENT或者CTRL_CLOSE_EVENT
//事件時,服務器將關閉監聽套接字,從而使主線程從接收連接的死循環中
//退出,并最終結束所有服務線程,釋放連接等
BOOL WINAPI CtrlHandler(DWORD dwEvent);
//完成端口操作函數
HANDLE CreateNewIoCompletionPort(DWORD dwNumberOfConcurrentThreads);
BOOL AssociateWithIoCompletePort(HANDLE hComPort, HANDLE hDevice, DWORD dwCompKey);
//創建監聽套接字
SOCKET CreateListenSock();
//當服務器接受了客戶端連接請求后,將返回的服務套接字和完成端口
//作為參數調用該函數,該函數完成服務套接字與完成端口的綁定
//以及為該連接的相關信息分配存儲區的工作
LPCONN_CTX CreateConnCtx(SOCKET sockAccept, HANDLE hIOCP);
?
//將新的連接信息加入到全局的連接信息鏈表中
void ConnListAdd(LPCONN_CTX lpConnCtx);
?
//將指定的連接信息從全局連接信息鏈表中刪除,
//并關閉連接,釋放相應的存儲區資源
void ConnListRemove(LPCONN_CTX lpConnCtx);
?
//完成服務器退出時關閉連接、釋放資源的工作
void ConnListClear();
?
//由于printf函數只能在用C運行庫中函數創建的線程中使用
//因此,本程序重寫自己的輸出函數
int ASCEPrintf(const char* lpFormat, ...);
//工作線程函數
DWORD WINAPI WorkThread(LPVOID lpParam);
?
int main(int argc, char* argv[])
{
??? HANDLE hIOCP;
??? HANDLE hThreadHandles[MAX_THREAD_NUM];???
??? int nThreadCount;
???
??? WSADATA wsaData;
??? if(WSAStartup(MAKEWORD(2,2), &wsaData) != 0)
??? {
??????? ASCEPrintf("Winsock initialized failed.../n");
??????? return -1;????????????????????????????
??? }
??? int i;
??? for(i=0; i<MAX_THREAD_NUM; i++)
??? {
??????? hThreadHandles[i] = NULL;???????
??? }
???
??? //設置控制臺事件響應函數o
??? if(!SetConsoleCtrlHandler(CtrlHandler, TRUE))
??? {
??????? ASCEPrintf("SetConsoleCtrlHandler:%d/n", GetLastError());
??????? return -1;??????????????????????????????????????
??? }
???
??? InitializeCriticalSection(&g_CriticalSection);
???
??? __try
??? {
??????? //創建I/O完成端口
??????? hIOCP = CreateNewIoCompletionPort(0);????
??????? if(hIOCP == NULL)
??????? {
??????????? ASCEPrintf("CreateIoCompletionPort:%d/n", GetLastError());
??????????? __leave;????????
??????? }
???????
??????? //創建多個工作線程
??????? SYSTEM_INFO sysInfo;
??????? GetSystemInfo(&sysInfo);
??????? //將sysInfo.dwNumberOfProcessors*2+2和¨a
??????? //MAX_THREAD_NUM之間的較小者賦給nThreadCount
??????? nThreadCount = (sysInfo.dwNumberOfProcessors*2+2) < MAX_THREAD_NUM
?????????????????????? ? (sysInfo.dwNumberOfProcessors*2+2) : MAX_THREAD_NUM;
??????? for(int i=0; i<nThreadCount; i++)
??????? {
??????????? HANDLE hThread = CreateThread(NULL, 0, WorkThread, hIOCP, 0, NULL);
??????????? if(hThread == NULL)
??????????? {
??????????????? ASCEPrintf("CreateThread:%d/n", GetLastError());
??????????????? __leave;??????????
??????????? } else {
??????????????? hThreadHandles[i] = hThread;??????
??????????? }
??????? }
??????? g_sockListen = CreateListenSock();
??????? if(g_sockListen == INVALID_SOCKET)
??????????? __leave;
???????
??????? SOCKET sockAccept;
??????? LPCONN_CTX lpConnCtx;
??????? int nResult;
??????? while(true)
??????? {
??????????? sockAccept = accept(g_sockListen, NULL, NULL);
??????????? if(sockAccept == INVALID_SOCKET)
??????????????? __leave;
??????????? lpConnCtx = CreateConnCtx(sockAccept, hIOCP);
??????????? if(lpConnCtx == NULL)
??????????????? _leave;
??????????? else
??????????????? ConnListAdd(lpConnCtx);
???????????
??????????? //投遞初始I/O操作
??????????? nResult = WSARecv(sockAccept,
??????????????????????? &(lpConnCtx->pPerIOData->wbuf),
??????????????????????? 1, NULL,
??????????????????????? &(lpConnCtx->pPerIOData->flags),
??????????????????????? &(lpConnCtx->pPerIOData->OverLapped),
??????????????????????? NULL);
??????????? if((nResult == SOCKET_ERROR) &&
??????????????????????? (WSAGetLastError() != ERROR_IO_PENDING))
??????????? {
??????????????? ASCEPrintf("WSARecv:%d/n", WSAGetLastError());
??????????????? ConnListRemove(lpConnCtx);
??????????????? break;???????????
??????????? }
??????? }??????????????
??? } __finally {
??????? if(hIOCP)
??????? {
??????????? for(int i=0; i<nThreadCount; i++)
??????????? {
???????? ???????PostQueuedCompletionStatus(hIOCP, 0, 0, NULL);???????
??????????? }????????
??????? }??
??????? //等待所有工作線程結束
??????? if(WAIT_OBJECT_0 != WaitForMultipleObjects(nThreadCount,
??????????? hThreadHandles, TRUE, 1000))
??????????? ASCEPrintf("WaitForMultipleObjects failed:%d/n", GetLastError());
??????? else {
???????????? for(int i=0; i<nThreadCount; i++)
???????????? {
???????????????? if(hThreadHandles[i] != NULL)
???????????????? {
???????????????????? if(!CloseHandle(hThreadHandles[i]))
???????????????????????? ASCEPrintf("CloseHandle:%d/n", GetLastError());????????????????????
???????????????? }???????
???????????????? hThreadHandles[i] = NULL;
???????????? }
??????? }
???????
??????? if(hIOCP)
??????? {
?????? ?????CloseHandle(hIOCP);
??????????? hIOCP = NULL;????????
??????? }???
???????
??????? if(g_sockListen != INVALID_SOCKET)
??????? {
??????????? closesocket(g_sockListen);
??????????? g_sockListen = INVALID_SOCKET;???????????????
??????? }
???????
?? ?????if(g_ptrConnCtxHead)
??????????? ConnListClear();
???????
??????? ASCEPrintf("...............Stopped./n");
??????? DeleteCriticalSection(&g_CriticalSection);
??????? SetConsoleCtrlHandler(CtrlHandler, FALSE);
??????? WSACleanup();
???????
??????? return 0;
??? }??
}
?
BOOL WINAPI CtrlHandler(DWORD dwEvent)
{
??? SOCKET sockTemp = INVALID_SOCKET;
??? switch(dwEvent)
??? {
??? case CTRL_C_EVENT:
??? case CTRL_LOGOFF_EVENT:
??? case CTRL_SHUTDOWN_EVENT:
??? case CTRL_CLOSE_EVENT:
???????? ASCEPrintf("Server Stopping........./n");
???????? sockTemp = g_sockListen;
???????? g_sockListen = INVALID_SOCKET;
???????? if(sockTemp != INVALID_SOCKET)
???????? {
???????????? closesocket(sockTemp);
???????????? sockTemp = INVALID_SOCKET;???????????
???????? } ???
???????? break;
??? default:
???????? return FALSE;??????????
??? }????
???
??? return TRUE;
}
?
//創建I/O完成端口
HANDLE CreateNewIoCompletionPort(DWORD dwNumberOfConcurrentThreads)
{
??? return (CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0,
???????????? dwNumberOfConcurrentThreads));??????
}
?
//將套接字與完成端口關聯
BOOL AssociateWithIoCompletionPort(HANDLE hComPort, HANDLE hDevice,
????????????????????????????????????????? DWORD dwCompKey)
{
??? return (CreateIoCompletionPort(hDevice, hComPort, dwCompKey, 0)
?????????????? == hComPort);
}
?
//創建服務器監聽套接字
SOCKET CreateListenSock()
{
??? //創建WSA_FLAG_OVERLAPPED屬性的套接字
??? SOCKET sock = WSASocket(AF_INET, SOCK_STREAM, 0,
?????????? NULL, 0, WSA_FLAG_OVERLAPPED);
??? if(sock == INVALID_SOCKET)
??????? return sock;
??? BOOL bReuseAddr = true;
??? if(setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (char*)&bReuseAddr,
???????? sizeof(bReuseAddr)) == SOCKET_ERROR)
??? {
??????? ASCEPrintf("setsockopt:%d/n", WSAGetLastError());
??????? closesocket(sock);
??????? return INVALID_SOCKET;????
??? }????
???
??? struct sockaddr_in local;
??? memset(&local, 0, sizeof(local));
??? local.sin_addr.s_addr = INADDR_ANY;
??? local.sin_family = AF_INET;
??? local.sin_port = htons(9999);
??? if(bind(sock, (struct sockaddr*)&local, sizeof(local)) == SOCKET_ERROR)
??? {
??????? ASCEPrintf("bind:%d/n", WSAGetLastError());
??????? closesocket(sock);
?? ?????return INVALID_SOCKET;?????????????
??? }?
???
??? if(listen(sock, 5) == SOCKET_ERROR)
??? {
??????? ASCEPrintf("listen:%d/n", WSAGetLastError());
??????? closesocket(sock);
??????? return INVALID_SOCKET;???????????????
??? }
??? return sock;
}
?
LPCONN_CTX CreateConnCtx(SOCKET sockAccept, HANDLE hIOCP)
{
??? LPCONN_CTX lpConnCtx = (LPCONN_CTX)GlobalAlloc(GPTR, sizeof(CONN_CTX));
??? if(lpConnCtx == NULL)
??????? return NULL;
???????
??? lpConnCtx->pPerIOData = (LPPER_IO_DATA)GlobalAlloc(GPTR, sizeof(PER_IO_DATA));??????
??? if(lpConnCtx->pPerIOData == NULL)
??? {
??????? GlobalFree(lpConnCtx);
??????? lpConnCtx = NULL;
??????? return NULL;?????????????????????????
??? }
???
??? //賦值
??? lpConnCtx->pNext = NULL;
??? lpConnCtx->pPrev = NULL;
??? lpConnCtx->sockAccept = sockAccept;
???
??? ZeroMemory(lpConnCtx->pPerIOData, sizeof(PER_IO_DATA));
??? lpConnCtx->pPerIOData->OverLapped.hEvent = NULL;
??? lpConnCtx->pPerIOData->OverLapped.Internal = 0;
??? lpConnCtx->pPerIOData->OverLapped.InternalHigh = 0;
??? lpConnCtx->pPerIOData->OverLapped.Offset = 0;
??? lpConnCtx->pPerIOData->OverLapped.OffsetHigh = 0;
??? lpConnCtx->pPerIOData->wbuf.buf = (char*)lpConnCtx->pPerIOData->data;
??? lpConnCtx->pPerIOData->wbuf.len = MAX_BUF_LEN;
??? lpConnCtx->pPerIOData->oper = SVR_IO_READ;
??? lpConnCtx->pPerIOData->flags = 0;
???
??? //將套接字和完成端口綁定
??? if(!AssociateWithIoCompletionPort(hIOCP, (HANDLE)sockAccept,
???????????????????????????????????????????? (DWORD)lpConnCtx))
??? {
??????? ASCEPrintf("AssociateWithIoCompletionPort:%d/n", GetLastError());
??????? GlobalFree(lpConnCtx->pPerIOData);
??????? GlobalFree(lpConnCtx);
??????? lpConnCtx = NULL;
??????? return NULL;????????????????????????????????????????
??? }
?? ?return lpConnCtx;
}
?
void ConnListAdd(LPCONN_CTX lpConnCtx)
{
??? LPCONN_CTX pTemp;
??? EnterCriticalSection(&g_CriticalSection);
??? if(g_ptrConnCtxHead == NULL)
??? {
??????? //鏈表的第一個節點
??????? lpConnCtx->pPrev = NULL;
??????? lpConnCtx->pNext = NULL;
??????? g_ptrConnCtxHead = lpConnCtx;???????????????????
??? } else {
??????? //加到鏈表頭部
??????? pTemp = g_ptrConnCtxHead;
??????? g_ptrConnCtxHead = lpConnCtx;
??????? lpConnCtx->pNext = pTemp;
??????? lpConnCtx->pPrev = NULL;
??????? pTemp->pPrev = lpConnCtx;??????
??? }
??? LeaveCriticalSection(&g_CriticalSection);
}
?
void ConnListRemove(LPCONN_CTX lpConnCtx)
{
??? LPCONN_CTX pPrev = NULL;
??? LPCONN_CTX pNext = NULL;????
??? EnterCriticalSection(&g_CriticalSection);
??? if(lpConnCtx != NULL)
??? {
??????? pPrev = lpConnCtx->pPrev;
??????? pNext = lpConnCtx->pNext;
??????? if((pPrev == NULL) && (pNext == NULL)) //鏈表唯一的節點
??????? {
??????????? g_ptrConnCtxHead = NULL;?????????
??????? } else if((pPrev == NULL) && (pNext != NULL)){ //鏈表首節點
??????????? pNext->pPrev = NULL;
??????????? g_ptrConnCtxHead = pNext;
??????? } else if((pPrev != NULL) && (pNext == NULL)){ //鏈表尾節點
??????????? pPrev->pNext = NULL;
??????? } else if((pPrev && pNext)){ //鏈表中間節點
??????????? pPrev->pNext = pNext;
??????????? pNext->pPrev = pPrev;
??????? }?????
??????? //關閉連接,釋放資源
??????? closesocket(lpConnCtx->sockAccept);
??????? GlobalFree(lpConnCtx->pPerIOData);
??????? GlobalFree(lpConnCtx);
??????? lpConnCtx = NULL;
??? }
??? LeaveCriticalSection(&g_CriticalSection);
??? return;
}
?
void ConnListClear()
{
??? LPCONN_CTX pTemp1, pTemp2;
??? EnterCriticalSection(&g_CriticalSection);
???
??? pTemp1 = g_ptrConnCtxHead;
??? while(pTemp1)
??? {
??????? pTemp2 = pTemp1->pNext;
??????? ConnListRemove(pTemp1);
??????? pTemp1 = pTemp2;
??? }
???
??? LeaveCriticalSection(&g_CriticalSection);
??? return;
}
?
int ASCEPrintf(const char* lpFormat, ...)
{
??? int nLen = 0;
??? int nRet = 0;
??? char cBuffer[512];
??? va_list arglist;
??? HANDLE hOut = NULL;
??? ZeroMemory(cBuffer, sizeof(cBuffer));
???
??? va_start(arglist, lpFormat);
???
??? nLen = lstrlen(lpFormat);
??? nRet = wvsprintf(cBuffer, lpFormat, arglist);
???
??? if(nRet >= nLen || GetLastError() == 0)
??? {
??????? hOut = GetStdHandle(STD_OUTPUT_HANDLE);
??????? if(hOut != INVALID_HANDLE_VALUE)
??????? {
??????????? WriteConsole(hOut, cBuffer, lstrlen(cBuffer),
?????????????????????????????? (LPDWORD)&nLen, NULL);?????????????????????????????????????
??????? }???????
??? }
??? return nLen;
}
?
?
DWORD WINAPI WorkThread(LPVOID lpParam)
{
??? HANDLE hIOCP = (HANDLE)lpParam;
??? BOOL bSuccess = FALSE;
??? DWORD dwIOSize;
??? LPPER_IO_DATA lpPerIOData;
??? LPOVERLAPPED lpOverLapped;
??? LPCONN_CTX lpConnCtx;
??? int nResult;
???
??? while(1)
??? {
??????? bSuccess = GetQueuedCompletionStatus(hIOCP, &dwIOSize,
???????????????? (LPDWORD)&lpConnCtx, &lpOverLapped, INFINITE);
??????? if(!bSuccess)
??????? {
??????????? ASCEPrintf("GetQueuedCompletionStatus:%d/n", GetLastError());
??????? }
???????
??????? if(lpConnCtx == NULL)
??????? {
??????????? return 1;????????????
??????? }
???????
??????? lpPerIOData = (LPPER_IO_DATA)(lpOverLapped);
??????? if(!bSuccess || (bSuccess && (dwIOSize == 0)))
??????? {
??????????? ConnListRemove(lpConnCtx);
??????????? continue;????????????
??????? }
#ifdef _DEBUG
??????? ASCEPrintf("Different way to obtain PER_IO_DATA/n");
??????? ASCEPrintf("The two one must be equal - A:%x/tB:%x/n",
????????????????????? lpConnCtx->pPerIOData, lpPerIOData);
#endif
??????? switch(lpPerIOData->oper)
??????? {
??????? case SVR_IO_WRITE:?????? //send then reveive
#ifdef _DEBUG
???????????? ASCEPrintf("Socket %d Send: %s/n", lpConnCtx->sockAccept,
??????????????????????????????? lpPerIOData->wbuf.buf);
#endif
???????????? ZeroMemory(lpPerIOData, sizeof(PER_IO_DATA));???????????
???????????? lpPerIOData->OverLapped.hEvent = NULL;
???????????? lpPerIOData->OverLapped.Internal = 0;
????? ???????lpPerIOData->OverLapped.InternalHigh = 0;
???????????? lpPerIOData->OverLapped.Offset = 0;
???????????? lpPerIOData->OverLapped.OffsetHigh = 0;
???????????? lpPerIOData->wbuf.buf = (char*)&(lpPerIOData->data);
???????????? lpPerIOData->wbuf.len = MAX_BUF_LEN;
???????????? lpPerIOData->oper = SVR_IO_READ;
???????????? lpPerIOData->flags = 0;
????????????
???????????? nResult = WSARecv(lpConnCtx->sockAccept,
?????????????????????????? &(lpPerIOData->wbuf),
?????????????????????????? 1, NULL, &(lpPerIOData->flags),
?????????????????????????? &(lpPerIOData->OverLapped),
?????????????????????????? NULL);
???????????? if(nResult == SOCKET_ERROR && WSAGetLastError() != ERROR_IO_PENDING)
???????????? {
?????????? ??????ASCEPrintf("WSARecv:%d/n", WSAGetLastError());
???????????????? ConnListRemove(lpConnCtx);??????????
???????????? }
???????????? break;
??????? case SVR_IO_READ: //receive then echo
#ifdef _DEBUG
???????????? ASCEPrintf("Socket %d recv:%s/n", lpConnCtx->sockAccept,
?????????????????? lpPerIOData->wbuf.buf);
#endif
???????????? lpPerIOData->wbuf.len = dwIOSize;
???????????? lpPerIOData->oper = SVR_IO_WRITE;
???????????? lpPerIOData->flags = 0;
???????????? nResult = WSASend(lpConnCtx->sockAccept,
??? ??????????????????????????&(lpPerIOData->wbuf),
????????????????????????????? 1, NULL, lpPerIOData->flags,
????????????????????????????? &(lpPerIOData->OverLapped),
????????????????????????????? NULL);
???????????? if(nResult == SOCKET_ERROR &&
????????? ??????????????WSAGetLastError() != ERROR_IO_PENDING)
???????????? {
???????????????? ASCEPrintf("WSASend:%d/n", WSAGetLastError());
???????????????? ConnListRemove(lpConnCtx);??????????
???????????? }
???????????? break;
??????? default:
???????????? break;
??????? }
??? }
??? return 0;
}
?
?
轉載于:https://www.cnblogs.com/android-html5/archive/2010/09/15/2533983.html
總結
以上是生活随笔為你收集整理的《ASCE1885的源码分析》の基于完成端口模型的TCP服务器框架的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: CTAS VS create table
- 下一篇: ASP.NET页面揭秘之页面生命周期