生活随笔
收集整理的這篇文章主要介紹了
ACE_Proactor网络通信示例
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
注:本文僅對使用ACE進行網絡通信進行演示說明。本文中的代碼皆使用doxgen的注釋風格。本文中使用的事件機制,其原理與實現請參考[ 基于C++的事件機制設計[2.0]]一文。
?
ACE的Proactor對Epoll和IOCP進行了良好包裝,因此,使用ACE來進行網絡開發是相當的便利,性能也不差。閑言少敘,看代碼。
這里以TCP協議進行流式通信。我們需要解析流,得出每次接收到的數據包大小和包含的數據域,假定我們的包結構如下:
?
包序列號(32Bit) | 長度(16Bit) | 數據域(大小為長度所表示的字節)... | (下一包)
?
通過分析由包序列號和長度組成的包頭來解決半包,粘包等問題,許多其它文章也有描述,這里就省略了。
這樣可以確定我們的包頭結構如下:
?
[cpp] view plain
copy #pragma?pack(push)??#pragma?pack(1)??????????????typedef?struct?tag_TTcpPackHeader??????{??????????unsigned?int?seq;???????????unsigned?short?len;???????}TTcpPackHeader;??#pragma?pack(pop)????#define?TCP_PACK_HEADER_SIZE?sizeof(tag_TTcpPackHeader)??
.
需要注意的是,要求在字節邊界對齊。
?
現在來看看通過ACE來實現TCP通信需要哪些東西:
INET_Addr 用于地址訪問
Task_Base 用于線程模型
Message_Block 用于消息傳遞和數據容器
Asynch_IO 異步通信
Proactor IOCP架構
?
并且,要建立這樣的通信架構,我們需要:
一個Acceptor:用于接受連接
一個Handler:對應于每個連接句柄,并用于數據的發送/接收。
一個事件分發線程:以事件的形式將接收到數據分發出去,并在對應的句柄上進行數據發送。
?
本示例并沒有采用在接收到數據時立即進行處理的方式,而是通過創建一個額外的事件分發線程的形式,將數據包投遞到該線程的消息隊列中,由該線程向外派送。因此,數據處理與網絡層是隔離的,且網絡層能專注于通信,最大的發揮效用。
?
好了,下面來看看實現:
先看Handler,參考ACE_Service_Handler,我們需要重載open(),addresses(), handle_read_stream(),handle_write_stream(),以在連接打開時進行讀寫流對象的初始化、獲取客戶端地址,處理輸入/輸入流。
?
注:以T作為類的開頭而不是C,是出于對曾經偉大的BORLAND的深刻懷念。
注:成員又以m_開頭,是出于對現而今仍偉大的MS的深刻怨念。
?
[cpp] view plain
copy ?????class?TTcpHandler?:?public?ACE_Service_Handler??{??public:?????????????????typedef?TEvent<void,?ACE_UINT32,?ACE_UINT16,?TTcpHandler?*>?TOnClientConnect;????????????????typedef?TEvent<void,?ACE_UINT32,?ACE_UINT16>?TOnClientDisconnect;???????????????????typedef?TEvent<bool,?ACE_UINT32,?ACE_UINT16>?TOnClientValidate;???????????????????typedef?TEvent<void,?ACE_UINT32,?ACE_UINT16,?unsigned?int,?const?char*,?unsigned?short>?TOnDataReceive;?????????????????????????typedef?TEvent<void,?ACE_UINT32,?ACE_UINT16,?unsigned?int,?const?char*,?unsigned?short>?TOnDataSendSucceeded;?????????????????????????typedef?TEvent<void,?ACE_UINT32,?ACE_UINT16,?unsigned?int,?const?char*,?unsigned?short>?TOnDataSendFailed;??private:??????ACE_Asynch_Read_Stream?m_Reader;???????ACE_Asynch_Write_Stream?m_Writer;???????ACE_Message_Block*?m_CurDataMB;???????ACE_INET_Addr?m_ClientAddr;???public:???????????????DECL_PROP(TOnClientConnect,?OnClientConnect)???????DECL_PROP(TOnClientDisconnect,?OnClientDisconnect)???????DECL_PROP(TOnDataReceive,?OnDataReceive)???????DECL_PROP(TOnDataSendSucceeded,?OnDataSendSucceeded)???????DECL_PROP(TOnDataSendFailed,?OnDataSendFailed)???????????public:????????????TTcpHandler();??????????????????~TTcpHandler();??????????????????????????int?send(unsigned?int?seq,?const?char*?data,?unsigned?short?size);?????????????????????virtual?void?open(ACE_HANDLE?h,?ACE_Message_Block&?mb);???????????????virtual?void?addresses?(const?ACE_INET_Addr?&remote_address,????????????????????????????const?ACE_INET_Addr?&local_address);?????????????????????virtual?void?handle_read_stream(const?ACE_Asynch_Read_Stream::Result&?result);???????????????virtual?void?handle_write_stream(const?ACE_Asynch_Write_Stream::Result&?result);??????????????void?initCurDataMB();??};???
?
而相應滴,Acceptor在接受連接時,產生出的Handler應該是TTcpHandler類型,其定義如下:
注意,為了將事件句柄與連接句柄(TTcpHandler)掛鉤,這里重載了make_handler()。而重載validate_connection則是為了讓連接驗證事件能夠在恰當的時機被激發。
[cpp] view plain
copy ???????class?TTcpAcceptor?:?public?ACE_Asynch_Acceptor<TTcpHandler>??{??public:????????????????DECL_PROP(TTcpHandler::TOnClientConnect,?OnClientConnect)??????DECL_PROP(TTcpHandler::TOnClientDisconnect,?OnClientDisconnect)??????DECL_PROP(TTcpHandler::TOnClientValidate,?OnClientValidate)??????DECL_PROP(TTcpHandler::TOnDataReceive,?OnDataReceive)??????DECL_PROP(TTcpHandler::TOnDataSendSucceeded,?OnDataSendSucceeded)??????DECL_PROP(TTcpHandler::TOnDataSendFailed,?OnDataSendFailed)??????????protected:????????????????virtual?int?validate_connection?(const?ACE_Asynch_Accept::Result&?result,?????????????????????????????????const?ACE_INET_Addr?&remote,?????????????????????????????????const?ACE_INET_Addr&?local);???????????????virtual?TTcpHandler*?make_handler(void);??};???
?
有了Acceptor和Handler,還需要使之運行于Proactor模式下,因此有了以下線程:
[cpp] view plain
copy ???????class?TTcpNetThread?:?public?ACE_Task_Base??{??public:????????????????DECL_PROP(TTcpHandler::TOnClientConnect,?OnClientConnect)??????DECL_PROP(TTcpHandler::TOnClientDisconnect,?OnClientDisconnect)??????DECL_PROP(TTcpHandler::TOnClientValidate,?OnClientValidate)??????DECL_PROP(TTcpHandler::TOnDataReceive,?OnDataReceive)??????DECL_PROP(TTcpHandler::TOnDataSendSucceeded,?OnDataSendSucceeded)??????DECL_PROP(TTcpHandler::TOnDataSendFailed,?OnDataSendFailed)????????????????????int?open();????????????int?close();??protected:????????????virtual?int?svc();??};??
?
最后再看看事件分發線程,該線程也是對上述實現的聚合和封裝,對外暴露事件和發送方法:
注意,該類也負責響應TTcpNetThread所激發的事件,所以需要派生自TObject。
?
[cpp] view plain
copy ?????class?TTcp?:?public?TObject,?public?ACE_Task<ACE_MT_SYNCH>??{??public:????????????????typedef?TTcpHandler::TOnClientConnect?TOnClientConnect;??????typedef?TTcpHandler::TOnClientDisconnect?TOnClientDisconnect;??????typedef?TTcpHandler::TOnClientValidate?TOnClientValidate;??????typedef?TTcpHandler::TOnDataReceive?TOnDataReceive;??????typedef?TTcpHandler::TOnDataSendSucceeded?TOnDataSendSucceeded;??????typedef?TTcpHandler::TOnDataSendFailed?TOnDataSendFailed;??????????private:???????????????ACE_Recursive_Thread_Mutex?m_Lock;???????hash_map<unsigned?__int64,?TTcpHandler?*>?m_AddrMap;???????TTcpNetThread*?m_TcpNetThd;??????????public:????????????????DECL_PROP(TTcpHandler::TOnClientConnect,?OnClientConnect)??????DECL_PROP(TTcpHandler::TOnClientDisconnect,?OnClientDisconnect)??????DECL_PROP(TTcpHandler::TOnClientValidate,?OnClientValidate)??????DECL_PROP(TTcpHandler::TOnDataReceive,?OnDataReceive)??????DECL_PROP(TTcpHandler::TOnDataSendSucceeded,?OnDataSendSucceeded)??????DECL_PROP(TTcpHandler::TOnDataSendFailed,?OnDataSendFailed)??????????public:????????????TTcp();????????????~TTcp();????????????void?open();????????????void?close();??????????????????int?send(ACE_UINT32?ip,?ACE_UINT16?port,?unsigned?int?seq,?const?char*?buf,?unsigned?short?len);??private:????????????virtual?int?svc();??private:????????????????void?tcpNetThread_OnClientConnect(ACE_UINT32?ip,?ACE_UINT16?port,?TTcpHandler*?handler);??????void?tcpNetThread_OnClientDisconnect(ACE_UINT32?ip,?ACE_UINT16?port);??????void?tcpNetThread_OnDataReceive(ACE_UINT32?ip,?ACE_UINT16?port,?unsigned?int?seq,?const?char*?data,?unsigned?short?size);??????void?tcpNetThread_OnDataSendSucceeded(ACE_UINT32?ip,?ACE_UINT16?port,?unsigned?int?seq,?const?char*?data,?unsigned?short?size);??????void?tcpNetThread_OnDataSendFailed(ACE_UINT32?ip,?ACE_UINT16?port,?unsigned?int?seq,?const?char*?data,?unsigned?short?size);??????????};???
?
現在基本格調已經確定,需要做的是編寫具體實現代碼了。
?
?
此乃末技。
?
?
應用ACE來作為底層通信的框架,已經是許多年前的技術了,這里純粹是湊字數,騙更新滴。這樣的老東西,確實是相當的讓人無語。
?現在我們一步步來看看實現:
先是TTcpAcceptor,該類僅重載了兩個方法,如下:
[cpp] view plain
copy #include?"TCPAcceptor.h"??namespace?igame??{??????int?TTcpAcceptor::validate_connection?(const?ACE_Asynch_Accept::Result&?result,?????????????????????????????????????????const?ACE_INET_Addr?&remote,?????????????????????????????????????????const?ACE_INET_Addr&?local)??????{??????????if?(m_OnClientValidate.valid())????????????????????????????return?m_OnClientValidate(remote.get_ip_address(),?remote.get_port_number())???0?:?-1;??????????else??????????????return?0;???????}??????TTcpHandler*?TTcpAcceptor::make_handler(void)??????{??????????TTcpHandler*?handler?=?0;??????????ACE_NEW_RETURN?(handler,?TTcpHandler(),?0);????????????????????handler->setOnClientConnect(m_OnClientConnect);??????????handler->setOnClientDisconnect(m_OnClientDisconnect);??????????handler->setOnDataReceive(m_OnDataReceive);??????????handler->setOnDataSendSucceeded(m_OnDataSendSucceeded);??????????handler->setOnDataSendFailed(m_OnDataSendFailed);??????????return?handler;??????}??}???
?
復雜的部分在TTcpHandler,該類不僅需要接收數據(拼包),也要處理發送:
?
[cpp] view plain
copy #include?"TcpHandler.h"??namespace?igame??{??????TTcpHandler::TTcpHandler()??????????:m_CurDataMB(0)???????{?}??????TTcpHandler::~TTcpHandler()??????{??????????if?(handle()?!=?ACE_INVALID_HANDLE)??????????{??????????????ACE_OS::closesocket(handle());???#ifdef?_DEBUG????????????????????????????ACE_TCHAR?remoteAddrStr[128];????????????????????????????m_ClientAddr.addr_to_string(remoteAddrStr,?sizeof(remoteAddrStr)?/?sizeof(ACE_TCHAR));??????????????ACE_DEBUG((LM_INFO,?ACE_TEXT("Disconnect?from?%s/n"),?remoteAddrStr));??#endif????????????????????????????m_OnClientDisconnect(m_ClientAddr.get_ip_address(),?m_ClientAddr.get_port_number());??????????????if?(m_CurDataMB)??????????????????m_CurDataMB->release();??????????}??????}??????int?TTcpHandler::send(unsigned?int?seq,?const?char*?data,?unsigned?short?dataSize)??????{??????????ACE_Message_Block*?dataMB?=?0;????????????????????ACE_NEW_NORETURN(dataMB,?ACE_Message_Block(sizeof(unsigned?int)?+?sizeof(unsigned?short)?+?dataSize));????????????????????short?len?=?dataSize;??????????dataMB->copy((const?char?*)&seq,?sizeof(unsigned?int));???????????dataMB->copy((const?char?*)&len,?sizeof(unsigned?short));??????????dataMB->copy((const?char?*)data,?dataSize);????????????????????????int?ret?=?m_Writer.write(*dataMB,?dataMB->length());???????????if?(ret?==?-1)??????????????m_OnDataSendFailed(m_ClientAddr.get_ip_address(),?m_ClientAddr.get_port_number(),?seq,?data,?dataSize);??????????else??????????????m_OnDataSendSucceeded(m_ClientAddr.get_ip_address(),?m_ClientAddr.get_port_number(),?seq,?data,?dataSize);??????????return?ret;??????}??????void?TTcpHandler::addresses?(const?ACE_INET_Addr?&remote_address,????????????????????????????const?ACE_INET_Addr?&local_address)??????{??????????m_ClientAddr?=?remote_address;???????}??????void?TTcpHandler::open(ACE_HANDLE?h,?ACE_Message_Block&?mb)??????{??????????handle(h);?????????????????????if?(m_Reader.open(*this)?==?-1)???????????{??????????????ACE_ERROR((LM_ERROR,?ACE_TEXT("failed?to?open?read?handle?%i/n"),?errno));??????????????delete?this;??????????????return;??????????}??????????if?(m_Writer.open(*this)?==?-1)???????????{??????????????ACE_ERROR((LM_ERROR,?ACE_TEXT("failed?to?open?write?handle?%i/n"),?errno));??????????????delete?this;??????????????return;??????????}??????????????????????????????m_OnClientConnect(m_ClientAddr.get_ip_address(),?m_ClientAddr.get_port_number(),?this);??????????initCurDataMB();????????????????????m_Reader.read(*m_CurDataMB,?m_CurDataMB->space());???????}??????void?TTcpHandler::handle_read_stream(const?ACE_Asynch_Read_Stream::Result&?result)??????{??????????ACE_Message_Block&?mb?=?result.message_block();??????????if?(!result.success()?||?result.bytes_transferred()?==?0)???????????{??????????????mb.release();??????????????delete?this;??????????}??????????else??????????{??????????????if?(this->m_CurDataMB->length()?<?TCP_PACK_HEADER_SIZE)???????????????{??????????????????this->m_Reader.read(*m_CurDataMB,?m_CurDataMB->space());??????????????????return?;??????????????}??????????????TTcpPackHeader*?header?=?reinterpret_cast<TTcpPackHeader?*>(this->m_CurDataMB->rd_ptr());??????????????ACE_Message_Block*?dataMB?=?this->m_CurDataMB->cont();??????????????if?(!dataMB)??????????????{??????????????????ACE_NEW_NORETURN(dataMB,?ACE_Message_Block(header->len));??????????????????if?(dataMB)??????????????????????this->m_CurDataMB->cont(dataMB);??????????????????else??????????????????{??????????????????????this->m_CurDataMB->release();??????????????????????ACE_DEBUG((LM_ERROR,?ACE_TEXT("Failed?to?allocated:?%i/n"),?errno));??????????????????????delete?this;??????????????????????return?;??????????????????}??????????????}????????????????????????????if?(dataMB->length()?==?header->len)??????????????{????????????????????????????????????m_OnDataReceive(m_ClientAddr.get_ip_address(),?m_ClientAddr.get_port_number(),?header->seq,?dataMB->rd_ptr(),?header->len);????????????????????????????????????m_CurDataMB->release();??????????????????initCurDataMB();?????????????????????????????????????this->m_Reader.read(*m_CurDataMB,?m_CurDataMB->space());???????????????????return?;??????????????}????????????????????????????this->m_Reader.read(*dataMB,?dataMB->space());???????????}??????}??????void?TTcpHandler::handle_write_stream(const?ACE_Asynch_Write_Stream::Result&?result)??????{??????????if?(result.success()?&&?result.bytes_transferred()?>?0)???????????{??????????????ACE_Message_Block&?mb?=?result.message_block();??#ifdef?_DEBUG??????????????ACE_TCHAR?addrStr[128];????????????????????????????m_ClientAddr.addr_to_string(addrStr,?sizeof(addrStr)?/?sizeof(ACE_TCHAR));????????????????????????????ACE_DEBUG((LM_INFO,?ACE_TEXT("Send?to?client:?%s?len:%i/n"),?addrStr,?result.bytes_transferred()));????????????????????????????char*?ptr?=?mb.rd_ptr();????????????????#endif????????????????????????????mb.release();??????????}??????}??????void?TTcpHandler::initCurDataMB()??????{??????????ACE_NEW_NORETURN(m_CurDataMB,?ACE_Message_Block(TCP_PACK_HEADER_SIZE,?TCP_DATA_RECEIVE));??????}??}???
?
然后是TTcpNetThread,該類的實現也相當簡單:
[cpp] view plain
copy #include?<ace/Proactor.h>??#include?"TCPNetThread.h"??namespace?igame??{??????int?TTcpNetThread::open()?{?return?this->activate();?}??????int?TTcpNetThread::close()??????{??????????ACE_Proactor::instance()->proactor_end_event_loop();???????????this->wait();???????????return?0;??????}????????????int?TTcpNetThread::svc()??????{??????????ACE_INET_Addr?listenAddr(DEF_LISTENING_PORT);???????????TTcpAcceptor?tcpAcceptor;?????????????????????tcpAcceptor.setOnClientConnect(m_OnClientConnect);??????????tcpAcceptor.setOnClientDisconnect(m_OnClientDisconnect);??????????tcpAcceptor.setOnClientValidate(m_OnClientValidate);??????????tcpAcceptor.setOnDataReceive(m_OnDataReceive);??????????tcpAcceptor.setOnDataSendFailed(m_OnDataSendFailed);??????????tcpAcceptor.setOnDataSendSucceeded(m_OnDataSendSucceeded);??????????????????????????????if?(tcpAcceptor.open(listenAddr,?0,?1,?5,?1,?0,?0)?!=?0)??????????????ACE_ERROR_RETURN((LM_ERROR,?ACE_TEXT("%p/n"),?ACE_TEXT("failed?to?open?TcpAcceptor?errno=%i/n"),?errno),?-1);????????????????????ACE_Proactor::instance()->proactor_run_event_loop();??????????ACE_DEBUG((LM_DEBUG,?ACE_TEXT("Network?fin/n")));??????????return?0;??????}??}???
?
最后,對以上三個類進行聚合,封裝,就成了TTcp類,在此之前,先定義消息類型:
[cpp] view plain
copy ???????#define?TCP_DATA_RECEIVE????????????0x5505????#define?TCP_CLIENT_CONNECT??????????0x5506????#define?TCP_CLIENT_DISCONNECT???????0x5507????#define?TCP_DATA_SEND???????????????0x5508????#define?TCP_DATA_SEND_SUCCEEDED?????0x5509????#define?TCP_DATA_SEND_FAILED????????0x550A????????#define?DEF_LISTENING_PORT??777??
現在看看TTcp的實現:
唔,太長了,下一篇吧。
?
?
?
此乃末技,
不知何用。
堆砌字數,
湊成更新。
走過路過,
不要錯過。
?請原諒偶拖篇幅,這里奉上拖欠的數字。
TTcp的實現如下:
?
[cpp] view plain
copy #include?"Tcp.h"??namespace?igame??{??????TTcp::TTcp()??????????:m_TcpNetThd(0)??????{??????????ACE_NEW_NORETURN(m_TcpNetThd,?TTcpNetThread());???????}??????TTcp::~TTcp()??????{??????????if?(m_TcpNetThd)???????????????delete?m_TcpNetThd;??????}??????void?TTcp::open()??????{??????????ACE_TRACE("TTcp::open");????????????????????????????????????????if?(m_TcpNetThd)??????????{??????????????m_TcpNetThd->setOnClientConnect(EVENT(TTcpHandler::TOnClientConnect,?TTcp,?this,?tcpNetThread_OnClientConnect));??????????????m_TcpNetThd->setOnClientDisconnect(EVENT(TTcpHandler::TOnClientDisconnect,?TTcp,?this,?tcpNetThread_OnClientDisconnect));??????????????m_TcpNetThd->setOnClientValidate(m_OnClientValidate);??????????????m_TcpNetThd->setOnDataReceive(EVENT(TTcpHandler::TOnDataReceive,?TTcp,?this,?tcpNetThread_OnDataReceive));??????????????m_TcpNetThd->setOnDataSendFailed(EVENT(TTcpHandler::TOnDataSendFailed,?TTcp,?this,?tcpNetThread_OnDataSendFailed));??????????????m_TcpNetThd->setOnDataSendSucceeded(EVENT(TTcpHandler::TOnDataSendSucceeded,?TTcp,?this,?tcpNetThread_OnDataSendSucceeded));??????????}??????????if?(activate()?==?-1)??????????????ACE_DEBUG((LM_ERROR,?ACE_TEXT("Resume?thread?failed")));??????}??????void?TTcp::close()??????{??????????if?(m_TcpNetThd)??????????????m_TcpNetThd->close();??????????ACE_TRACE("TTcp::close");??????????ACE_Message_Block*?termBlock;?????????????????????ACE_NEW_NORETURN(termBlock,?ACE_Message_Block(0,?ACE_Message_Block::MB_HANGUP));??????????if?(!termBlock)??????????????ACE_DEBUG((LM_ERROR,?ACE_TEXT("Allocate?failed?%i"),?errno));??????????else??????????{??????????????putq(termBlock);??????????????wait();??????????}??????}??????int?TTcp::send(ACE_UINT32?ip,?ACE_UINT16?port,?unsigned?int?seq,?const?char*?buf,?unsigned?short?len)??????{??????????ACE_Message_Block*?mb?=?0;???????????ACE_NEW_RETURN(mb,?ACE_Message_Block(sizeof(ACE_UINT32)?+?sizeof(ACE_UINT16)?+?sizeof(unsigned?int)?+?sizeof(unsigned?short)?+?len,?TCP_DATA_SEND),?-1);??????????????????????????????mb->copy((const?char?*)&ip,?sizeof(ACE_UINT32));??????????mb->copy((const?char?*)&port,?sizeof(ACE_UINT16));??????????mb->copy((const?char?*)&seq,?sizeof(unsigned?int));??????????mb->copy((const?char?*)&len,?sizeof(unsigned?short));??????????mb->copy(buf,?len);??????????return?putq(mb);??????}??????int?TTcp::svc()??????{??????????ACE_TRACE("TTcp::svc");??????????if?(m_TcpNetThd->open()?==?-1)??????????????ACE_DEBUG((LM_ERROR,?ACE_TEXT("Failed?to?pen?TTcpNetThread:?%i"),?errno));??????????ACE_Message_Block*?msg?=?0;??????????while(true)??????????{??????????????if?(getq(msg)?==?-1)??????????????{??????????????????ACE_ERROR_RETURN((LM_ERROR,?ACE_TEXT("Failed?to?getq?%i"),?errno),?-1);??????????????}??????????????switch(msg->msg_type())??????????????{??????????????case?ACE_Message_Block::MB_HANGUP:???????????????????{??????????????????????ACE_DEBUG((LM_DEBUG,?ACE_TEXT("Quit")));??????????????????????msg->release();??????????????????????return?0;??????????????????}??????????????????break;??????????????case?TCP_CLIENT_CONNECT:???????????????????{??????????????????????int?len?=?msg->length();??????????????????????int?hLen?=?sizeof(TTcpHandler?*);??????????????????????if?(msg->length()?!=?TCP_PACK_HEADER_SIZE?+?sizeof(TTcpHandler?*))??????????????????????????ACE_ERROR_RETURN((LM_ERROR,?ACE_TEXT("Tcp?connection?message?block?invalid!")),?-1);??????????????????????char*?ptr?=?msg->rd_ptr();??????????????????????ACE_UINT32?ip?=?*(ACE_UINT32?*)ptr;?ptr?+=?sizeof(ACE_UINT32);??????????????????????ACE_UINT16?port?=?*(ACE_UINT16?*)ptr;?ptr?+=?sizeof(ACE_UINT16);??????????????????????TTcpHandler*?handler?=?(TTcpHandler?*)(*(int?*)ptr);??????????????????????{??????????????????????????ACE_Guard<ACE_Recursive_Thread_Mutex>?lock(m_Lock);??????????????????????????m_AddrMap.insert(make_pair<unsigned?__int64,?TTcpHandler?*>((unsigned?__int64)ip?<<?32?|?port,?handler));???????????????????????}???????????????????????????????????????????????????????????m_OnClientConnect(ip,?port,?handler);??????????????????}??????????????????break;??????????????case?TCP_CLIENT_DISCONNECT:???????????????????{??????????????????????if?(msg->length()?!=?sizeof(ACE_UINT32)?+?sizeof(ACE_UINT16))??????????????????????????ACE_ERROR_RETURN((LM_ERROR,?ACE_TEXT("Invalid?tcp?disconnect?message?block/n")),?-1);??????????????????????char*?ptr?=?msg->rd_ptr();??????????????????????ACE_UINT32?ip?=?*(ACE_UINT32?*)ptr;?ptr?+=?sizeof(ACE_UINT32);??????????????????????ACE_UINT16?port?=?*(ACE_UINT16?*)ptr;??????????????????????{??????????????????????????ACE_Guard<ACE_Recursive_Thread_Mutex>?lock(m_Lock);??????????????????????????m_AddrMap.erase((unsigned?__int64)ip?<<?32?|?port);??????????????????????}??????????????????????m_OnClientDisconnect(ip,?port);??????????????????}??????????????????break;??????????????case?TCP_DATA_RECEIVE:??????????????????{??????????????????????char*?ptr?=?msg->rd_ptr();??????????????????????ACE_UINT32?ip?=?*(ACE_UINT32?*)ptr;?ptr?+=?sizeof(ACE_UINT32);??????????????????????ACE_UINT16?port?=?*(ACE_UINT16?*)ptr;?ptr?+=?sizeof(ACE_UINT16);??????????????????????TTcpPackHeader*?header?=?(TTcpPackHeader?*)ptr;?ptr?+=?TCP_PACK_HEADER_SIZE;??????????????????????const?char*?data?=?ptr;??????????????????????m_OnDataReceive(ip,?port,?header->seq,?data,?header->len);??????????????????}??????????????????break;??????????????case?TCP_DATA_SEND:??????????????????{??????????????????????if?(msg->length()?>?sizeof(TTcpPackHeader))??????????????????????{??????????????????????????char*?ptr?=?msg->rd_ptr();??????????????????????????ACE_UINT32?ip?=?*(ACE_UINT32?*)ptr;?ptr?+=?sizeof(ACE_UINT32);??????????????????????????ACE_UINT16?port?=?*(ACE_UINT16?*)ptr;?ptr?+=?sizeof(ACE_UINT16);??????????????????????????unsigned?int?seq?=?*(unsigned?int?*)ptr;?ptr?+=?sizeof(unsigned?int);??????????????????????????unsigned?short?len?=?*(unsigned?short?*)ptr;?ptr?+=?sizeof(unsigned?short);??????????????????????????const?char*?data?=?ptr;????????????????????????????????????????????????????{??????????????????????????????ACE_Guard<ACE_Recursive_Thread_Mutex>?_lock(m_Lock);??????????????????????????????hash_map<unsigned?__int64,?TTcpHandler?*>::iterator?it?=?m_AddrMap.find((unsigned?__int64)ip?<<?32?|?port);??????????????????????????????if?(it?!=?m_AddrMap.end())??????????????????????????????{??????????????????????????????????(*it).second->send(seq,?data,?len);??????????????????????????????}??????????????????????????}??????????????????????}??????????????????}??????????????????break;??????????????case?TCP_DATA_SEND_SUCCEEDED:??????????????????{??????????????????????char*?ptr?=?msg->rd_ptr();??????????????????????ACE_UINT32?ip?=?*(ACE_UINT32?*)ptr;?ptr?+=?sizeof(ACE_UINT32);??????????????????????ACE_UINT16?port?=?*(ACE_UINT16?*)ptr;?ptr?+=?sizeof(ACE_UINT16);??????????????????????TTcpPackHeader*?header?=?(TTcpPackHeader?*)ptr;?ptr?+=?TCP_PACK_HEADER_SIZE;??????????????????????const?char*?data?=?ptr;??????????????????????m_OnDataSendSucceeded(ip,?port,?header->seq,?data,?header->len);??????????????????}??????????????????break;??????????????case?TCP_DATA_SEND_FAILED:??????????????????{??????????????????????char*?ptr?=?msg->rd_ptr();??????????????????????ACE_UINT32?ip?=?*(ACE_UINT32?*)ptr;?ptr?+=?sizeof(ACE_UINT32);??????????????????????ACE_UINT16?port?=?*(ACE_UINT16?*)ptr;?ptr?+=?sizeof(ACE_UINT16);??????????????????????TTcpPackHeader*?header?=?(TTcpPackHeader?*)ptr;?ptr?+=?TCP_PACK_HEADER_SIZE;??????????????????????const?char*?data?=?ptr;??????????????????????m_OnDataSendFailed(ip,?port,?header->seq,?data,?header->len);??????????????????}??????????????????break;??????????????default:??????????????????{??????????????????????ACE_DEBUG((LM_ERROR,?ACE_TEXT("Unknown?ACE_Message_Block?type?%i/n"),?msg->msg_type()));??????????????????}??????????????????break;??????????????}???????????????msg->release();??????????}???????????return?0;??????}??????void?TTcp::tcpNetThread_OnClientConnect(ACE_UINT32?ip,?ACE_UINT16?port,?TTcpHandler*?handler)??????{??????????ACE_Message_Block*?mb?=?0;??????????ACE_NEW_NORETURN(mb,?ACE_Message_Block(sizeof(ACE_UINT32)?+?sizeof(ACE_UINT16)?+?sizeof(TTcpHandler?*),?TCP_CLIENT_CONNECT));??????????if?(mb)??????????{??????????????mb->copy((const?char?*)&ip,?sizeof(ACE_UINT32));??????????????mb->copy((const?char?*)&port,?sizeof(ACE_UINT16));??????????????mb->copy((const?char?*)&handler,?sizeof(TTcpHandler?*));??????????????this->putq(mb);??????????}??????}??????void?TTcp::tcpNetThread_OnClientDisconnect(ACE_UINT32?ip,?ACE_UINT16?port)??????{??????????ACE_Message_Block*?mb?=?0;??????????ACE_NEW_NORETURN(mb,?ACE_Message_Block(sizeof(ACE_UINT32)?+?sizeof(ACE_UINT16),?TCP_CLIENT_DISCONNECT));??????????if?(mb)??????????{??????????????mb->copy((const?char?*)&ip,?sizeof(ACE_UINT32));??????????????mb->copy((const?char?*)&port,?sizeof(ACE_UINT16));????????????????????????????this->putq(mb);??????????}??????}????????????void?TTcp::tcpNetThread_OnDataReceive(ACE_UINT32?ip,?ACE_UINT16?port,?unsigned?int?seq,?const?char*?data,?unsigned?short?size)??????{??????????ACE_Message_Block*?mb?=?0;??????????ACE_NEW_NORETURN(mb,?ACE_Message_Block(sizeof(ACE_UINT32)?+?sizeof(ACE_UINT16)?+?TCP_PACK_HEADER_SIZE?+?size,?TCP_DATA_RECEIVE));??????????if?(mb)??????????{??????????????mb->copy((const?char?*)&ip,?sizeof(ACE_UINT32));??????????????mb->copy((const?char?*)&port,?sizeof(ACE_UINT16));??????????????mb->copy((const?char?*)&seq,?sizeof(unsigned?int));??????????????mb->copy((const?char?*)&size,?sizeof(unsigned?short));??????????????mb->copy(data,?size);????????????????????????????this->putq(mb);??????????}??????}??????void?TTcp::tcpNetThread_OnDataSendSucceeded(ACE_UINT32?ip,?ACE_UINT16?port,?unsigned?int?seq,?const?char*?data,?unsigned?short?size)??????{??????????ACE_Message_Block*?mb?=?0;??????????ACE_NEW_NORETURN(mb,?ACE_Message_Block(sizeof(ACE_UINT32)?+?sizeof(ACE_UINT16)?+?TCP_PACK_HEADER_SIZE?+?size,?TCP_DATA_SEND_SUCCEEDED));??????????if?(mb)??????????{??????????????mb->copy((const?char?*)&ip,?sizeof(ACE_UINT32));??????????????mb->copy((const?char?*)&port,?sizeof(ACE_UINT16));??????????????mb->copy((const?char?*)&seq,?sizeof(unsigned?int));??????????????mb->copy((const?char?*)&size,?sizeof(unsigned?short));??????????????mb->copy(data,?size);????????????????????????????this->putq(mb);??????????}??????}??????void?TTcp::tcpNetThread_OnDataSendFailed(ACE_UINT32?ip,?ACE_UINT16?port,?unsigned?int?seq,?const?char*?data,?unsigned?short?size)??????{??????????ACE_Message_Block*?mb?=?0;??????????ACE_NEW_NORETURN(mb,?ACE_Message_Block(sizeof(ACE_UINT32)?+?sizeof(ACE_UINT16)?+?TCP_PACK_HEADER_SIZE?+?size,?TCP_DATA_SEND_FAILED));??????????if?(mb)??????????{??????????????mb->copy((const?char?*)&ip,?sizeof(ACE_UINT32));??????????????mb->copy((const?char?*)&port,?sizeof(ACE_UINT16));??????????????mb->copy((const?char?*)&seq,?sizeof(unsigned?int));??????????????mb->copy((const?char?*)&size,?sizeof(unsigned?short));??????????????mb->copy(data,?size);????????????????????????????this->putq(mb);??????????}??????}??}???
?
在完整的工程中,還有測試代碼,這里就不列出了。本來已經在下載頻道中上傳了,并設置下載點數為0,結果傳完后楞是自私都找不到?!NNDCSDN!!
這是下載資源。
?
來信到igame2000@hotmail.com
?
需要完整代碼的請來信索取吧,必復。
?
?
此乃末技。。。。
?
《新程序員》:云原生和全面數字化實踐50位技術專家共同創作,文字、視頻、音頻交互閱讀
總結
以上是生活随笔為你收集整理的ACE_Proactor网络通信示例的全部內容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。