UDT协议实现分析——连接的建立
UDT Server在執(zhí)行UDT::listen()之后,就可以接受其它節(jié)點(diǎn)的連接請求了。這里我們研究一下UDT連接建立的過程。
連接的發(fā)起
來看連接的發(fā)起方。如前面我們看到的那樣,UDT Client創(chuàng)建一個(gè)Socket,可以將該Socket綁定到某個(gè)端口,也可以不綁定,然后就可以調(diào)用UDT::connect()將這個(gè)Socket連接到UDT Server了。來看UDT::connect()的定義(src/api.cpp):
int CUDTUnited::connect(const UDTSOCKET u, const sockaddr* name, int namelen) {CUDTSocket* s = locate(u);if (NULL == s)throw CUDTException(5, 4, 0);CGuard cg(s->m_ControlLock);// check the size of SOCKADDR structureif (AF_INET == s->m_iIPversion) {if (namelen != sizeof(sockaddr_in))throw CUDTException(5, 3, 0);} else {if (namelen != sizeof(sockaddr_in6))throw CUDTException(5, 3, 0);}// a socket can "connect" only if it is in INIT or OPENED statusif (INIT == s->m_Status) {if (!s->m_pUDT->m_bRendezvous) {s->m_pUDT->open();updateMux(s);s->m_Status = OPENED;} elsethrow CUDTException(5, 8, 0);} else if (OPENED != s->m_Status)throw CUDTException(5, 2, 0);// connect_complete() may be called before connect() returns.// So we need to update the status before connect() is called,// otherwise the status may be overwritten with wrong value (CONNECTED vs. CONNECTING).s->m_Status = CONNECTING;try {s->m_pUDT->connect(name);} catch (CUDTException &e) {s->m_Status = OPENED;throw e;}// record peer addressdelete s->m_pPeerAddr;if (AF_INET == s->m_iIPversion) {s->m_pPeerAddr = (sockaddr*) (new sockaddr_in);memcpy(s->m_pPeerAddr, name, sizeof(sockaddr_in));} else {s->m_pPeerAddr = (sockaddr*) (new sockaddr_in6);memcpy(s->m_pPeerAddr, name, sizeof(sockaddr_in6));}return 0; }int CUDT::connect(UDTSOCKET u, const sockaddr* name, int namelen) {try {return s_UDTUnited.connect(u, name, namelen);} catch (CUDTException &e) {s_UDTUnited.setError(new CUDTException(e));return ERROR;} catch (bad_alloc&) {s_UDTUnited.setError(new CUDTException(3, 2, 0));return ERROR;} catch (...) {s_UDTUnited.setError(new CUDTException(-1, 0, 0));return ERROR;} }int connect(UDTSOCKET u, const struct sockaddr* name, int namelen) {return CUDT::connect(u, name, namelen); }UDT::connect() API實(shí)現(xiàn)的結(jié)構(gòu)跟其它的API沒有太大的區(qū)別,不再贅述,直接來分析CUDTUnited::connect():
調(diào)用CUDTUnited::locate(),查找UDT Socket對應(yīng)的CUDTSocket結(jié)構(gòu)。若找不到,則拋出異常直接返回;否則,繼續(xù)執(zhí)行。
根據(jù)UDT Socket的IP版本,檢查目標(biāo)地址的有效性。若無效,則退出,否則繼續(xù)執(zhí)行。
檢查UDT Socket的狀態(tài)。確保只有處于INIT或OPENED狀態(tài)的UDT Socket才可以執(zhí)行connect()操作。新創(chuàng)建的UDT Socket處于INIT狀態(tài),bind之后UDT Socket處于OPENED狀態(tài)。如果UDT Socket處于INIT狀態(tài),且不是Rendezvous模式,還會執(zhí)行s->m_pUDT->open(),將UDT Socket與多路復(fù)用器CMultiplexer,然后將狀態(tài)置為OPENED。
前面我們在bind的執(zhí)行過程中有看到將UDT Socket與多路復(fù)用器CMultiplexer關(guān)聯(lián)的過程CUDTUnited::updateMux()。但這里執(zhí)行的updateMux()的不同之處在于,它既沒有傳遞有效的系統(tǒng)UDP socket,也沒有傳遞有效的本地端口地址。回想updateMux()的實(shí)現(xiàn),這兩個(gè)參數(shù)主要決定了CMultiplexer的CChannel將與哪個(gè)端口關(guān)聯(lián)。來看兩個(gè)CChannel::open()的實(shí)現(xiàn)(src/channel.cpp):
可以看到CChannel::open()主要是把UDT的CChannel與一個(gè)系統(tǒng)的UDP socket關(guān)聯(lián)起來,它們總共處理了3中情況,一是調(diào)用者已經(jīng)創(chuàng)建并綁定到了目標(biāo)端口的系統(tǒng)UDP socket,這種最簡單,直接將傳遞進(jìn)來的UDPSOCKET賦值給CChannel的m_iSocket,然后設(shè)置系統(tǒng)UDP socket的選項(xiàng);二是傳遞進(jìn)來了一個(gè)有效的本地端口地址,此時(shí)CChannel會自己先創(chuàng)建一個(gè)系統(tǒng)UDP socket,并將該socket綁定到傳進(jìn)來的目標(biāo)端口地址,一、二兩種情況正是UDT的兩個(gè)bind API的情況;三是既沒有有效的系統(tǒng)UDP socket,又沒有有效的本地端口地址傳進(jìn)來,則會在創(chuàng)建了系統(tǒng)UDP socket之后,先再找一個(gè)可用的端口地址,然后將該socket綁定到找到的端口地址,這也就是UDT Socket沒有bind,直接connect的情況。
將UDT Socket的狀態(tài)置為CONNECTING。
執(zhí)行s->m_pUDT->connect(name),連接UDT Server。如果連接失敗,有異常拋出,UDT Socket的狀態(tài)會退回到OPENED狀態(tài),然后返回。在這個(gè)函數(shù)中會完成建立連接整個(gè)的網(wǎng)絡(luò)消息交互過程。
將連接的目標(biāo)地址復(fù)制到UDT Socket的Peer Address。然后返回0表示成功結(jié)束。
在仔細(xì)地分析連接建立過程中的數(shù)據(jù)包交互之前,可以先粗略地看一下這個(gè)過程收發(fā)了幾個(gè)包,及各個(gè)包收發(fā)的順序。我們知道在UDT中,所有數(shù)據(jù)包的收發(fā)都是通過CChannel完成的,我們可以在CChannel::sendto()和CChannel::recvfrom()中加log來track這一過程。通過UDT提供的demo程序appserver和appclient(在app/目錄下)來研究。先在一個(gè)終端下執(zhí)行appserver:
xxxxxx@ThundeRobot:/media/data/downloads/hudt/app$ ./appserver server is ready at port: 9000改造appclient,使得它只發(fā)送一個(gè)比較小的數(shù)據(jù)包就結(jié)束,編譯后在另一個(gè)終端下執(zhí)行,可以看到有如下的logs吐出來:
xxxxxx@ThundeRobot:/media/data/downloads/hudt/app$ ./appclient 127.0.0.1 9000 To connect CRcvQueue::registerConnector Send packet 0Receive packet 364855723 unit->m_Packet.m_iID 364855723 Send packet 0Receive packet 364855723 unit->m_Packet.m_iID 364855723To send data. send 10 bytes Send packet 1020108693Receive packet 364855723 unit->m_Packet.m_iID 364855723 Send packet 1020108693Receive packet 364855723 unit->m_Packet.m_iID 364855723 Send packet 1020108693Receive packet 364855723 unit->m_Packet.m_iID 364855723 Send packet 1020108693在appclient運(yùn)行的這段時(shí)間,在運(yùn)行appserver的終端下的可以看到有如下的logs輸出:
xxxxxx@ThundeRobot:/media/data/downloads/hudt/app$ ./appserver server is ready at port: 9000Receive packet 0 unit->m_Packet.m_iID 0 Send packet 364855723Receive packet 0 unit->m_Packet.m_iID 0 new CUDTSocket SocketID is 1020108693 PeerID 364855723 Send packet 364855723new connection: 127.0.0.1:59847Receive packet 1020108693 unit->m_Packet.m_iID 1020108693 Send packet 364855723Send packet 364855723Send packet 364855723Receive packet 1020108693 unit->m_Packet.m_iID 1020108693Receive packet 1020108693 unit->m_Packet.m_iID 1020108693Receive packet 1020108693 unit->m_Packet.m_iID 1020108693 recv:Connection was broken.可以看到,UDT Client端先發(fā)送了一個(gè)消息MSG1給UDT Server;UDT Server端收到消息MSG1之后,回了一個(gè)消息MSG2給UDT Client;UDT Client收到消息MSG2,又回了一個(gè)消息MSG3給UDT Server;UDT Server收到消息MSG3后又回了一個(gè)消息MSG4給UDT Client,然后從UDT::accept()返回,自此UDT Server認(rèn)為一個(gè)連接已經(jīng)成功建立;UDT Client則在收到消息MSG4后,從UDT::connect()返回,并自此認(rèn)為連接已成功建立,可以進(jìn)行數(shù)據(jù)的收發(fā)了。用一幅圖來描述這個(gè)過程:
150954_myfS_919237.png
至于MSG1、2、3、4的具體格式及內(nèi)容,則留待我們后面來具體分析了。
接著來看連接建立過程消息交互具體的實(shí)現(xiàn),也就是CUDT::connect()函數(shù):
void CUDT::connect(const sockaddr* serv_addr) {CGuard cg(m_ConnectionLock);if (!m_bOpened)throw CUDTException(5, 0, 0);if (m_bListening)throw CUDTException(5, 2, 0);if (m_bConnecting || m_bConnected)throw CUDTException(5, 2, 0);// record peer/server addressdelete m_pPeerAddr;m_pPeerAddr = (AF_INET == m_iIPversion) ? (sockaddr*) new sockaddr_in : (sockaddr*) new sockaddr_in6;memcpy(m_pPeerAddr, serv_addr, (AF_INET == m_iIPversion) ? sizeof(sockaddr_in) : sizeof(sockaddr_in6));// register this socket in the rendezvous queue// RendezevousQueue is used to temporarily store incoming handshake, non-rendezvous connections also require this functionuint64_t ttl = 3000000;if (m_bRendezvous)ttl *= 10;ttl += CTimer::getTime();m_pRcvQueue->registerConnector(m_SocketID, this, m_iIPversion, serv_addr, ttl);// This is my current configurationsm_ConnReq.m_iVersion = m_iVersion;m_ConnReq.m_iType = m_iSockType;m_ConnReq.m_iMSS = m_iMSS;m_ConnReq.m_iFlightFlagSize = (m_iRcvBufSize < m_iFlightFlagSize) ? m_iRcvBufSize : m_iFlightFlagSize;m_ConnReq.m_iReqType = (!m_bRendezvous) ? 1 : 0;m_ConnReq.m_iID = m_SocketID;CIPAddress::ntop(serv_addr, m_ConnReq.m_piPeerIP, m_iIPversion);// Random Initial Sequence Numbersrand((unsigned int) CTimer::getTime());m_iISN = m_ConnReq.m_iISN = (int32_t) (CSeqNo::m_iMaxSeqNo * (double(rand()) / RAND_MAX));m_iLastDecSeq = m_iISN - 1;m_iSndLastAck = m_iISN;m_iSndLastDataAck = m_iISN;m_iSndCurrSeqNo = m_iISN - 1;m_iSndLastAck2 = m_iISN;m_ullSndLastAck2Time = CTimer::getTime();// Inform the server my configurations.CPacket request;char* reqdata = new char[m_iPayloadSize];request.pack(0, NULL, reqdata, m_iPayloadSize);// ID = 0, connection requestrequest.m_iID = 0;int hs_size = m_iPayloadSize;m_ConnReq.serialize(reqdata, hs_size);request.setLength(hs_size);m_pSndQueue->sendto(serv_addr, request);m_llLastReqTime = CTimer::getTime();m_bConnecting = true;// asynchronous connect, return immediatelyif (!m_bSynRecving) {delete[] reqdata;return;}// Wait for the negotiated configurations from the peer side.CPacket response;char* resdata = new char[m_iPayloadSize];response.pack(0, NULL, resdata, m_iPayloadSize);CUDTException e(0, 0);while (!m_bClosing) {// avoid sending too many requests, at most 1 request per 250msif (CTimer::getTime() - m_llLastReqTime > 250000) {m_ConnReq.serialize(reqdata, hs_size);request.setLength(hs_size);if (m_bRendezvous)request.m_iID = m_ConnRes.m_iID;m_pSndQueue->sendto(serv_addr, request);m_llLastReqTime = CTimer::getTime();}response.setLength(m_iPayloadSize);if (m_pRcvQueue->recvfrom(m_SocketID, response) > 0) {if (connect(response) <= 0)break;// new request/response should be sent out immediately on receving a responsem_llLastReqTime = 0;}if (CTimer::getTime() > ttl) {// timeoute = CUDTException(1, 1, 0);break;}}delete[] reqdata;delete[] resdata;if (e.getErrorCode() == 0) {if (m_bClosing) // if the socket is closed before connection...e = CUDTException(1);else if (1002 == m_ConnRes.m_iReqType) // connection request rejectede = CUDTException(1, 2, 0);else if ((!m_bRendezvous) && (m_iISN != m_ConnRes.m_iISN)) // secuity checke = CUDTException(1, 4, 0);}if (e.getErrorCode() != 0)throw e; }可以看到,在這個(gè)函數(shù)中主要完成了如下的這樣一些事情:
檢查CUDT的狀態(tài)。確保只有已經(jīng)與多路復(fù)用器關(guān)聯(lián),即處于OPENED狀態(tài)的UDT Socket才能執(zhí)行CUDT::connect()操作。如前面看到的,bind操作可以使UDT Socket進(jìn)入OPENED狀態(tài)。對于沒有進(jìn)行過bind的UDT Socket,CUDTUnited::connect()會做這樣的保證。
拷貝目標(biāo)網(wǎng)絡(luò)地址為UDT Socket的PeerAddr。
執(zhí)行m_pRcvQueue->registerConnector()向接收隊(duì)列注冊Connector。來看這個(gè)函數(shù)的執(zhí)行過程(src/queue.cpp):
可以看到,在這個(gè)函數(shù)中,主要是向接收隊(duì)列CRcvQueue的CRendezvousQueue m_pRendezvousQueue中插入了一個(gè)CRL結(jié)構(gòu)。那CRendezvousQueue又是個(gè)什么東西呢?來看它的定義(src/queue.h):
class CRendezvousQueue {public:CRendezvousQueue();~CRendezvousQueue();public:void insert(const UDTSOCKET& id, CUDT* u, int ipv, const sockaddr* addr, uint64_t ttl);void remove(const UDTSOCKET& id);CUDT* retrieve(const sockaddr* addr, UDTSOCKET& id);void updateConnStatus();private:struct CRL {UDTSOCKET m_iID; // UDT socket ID (self)CUDT* m_pUDT; // UDT instanceint m_iIPversion; // IP versionsockaddr* m_pPeerAddr; // UDT sonnection peer addressuint64_t m_ullTTL; // the time that this request expires};std::list<CRL> m_lRendezvousID; // The sockets currently in rendezvous modepthread_mutex_t m_RIDVectorLock; };可以看到,它就是一個(gè)簡單的容器,提供的操作也是常規(guī)的插入、移除及檢索等操作:
void CRendezvousQueue::remove(const UDTSOCKET& id) {CGuard vg(m_RIDVectorLock);for (list<CRL>::iterator i = m_lRendezvousID.begin(); i != m_lRendezvousID.end(); ++i) {if (i->m_iID == id) {if (AF_INET == i->m_iIPversion)delete (sockaddr_in*) i->m_pPeerAddr;elsedelete (sockaddr_in6*) i->m_pPeerAddr;m_lRendezvousID.erase(i);return;}} }CUDT* CRendezvousQueue::retrieve(const sockaddr* addr, UDTSOCKET& id) {CGuard vg(m_RIDVectorLock);// TODO: optimize searchfor (list<CRL>::iterator i = m_lRendezvousID.begin(); i != m_lRendezvousID.end(); ++i) {if (CIPAddress::ipcmp(addr, i->m_pPeerAddr, i->m_iIPversion) && ((0 == id) || (id == i->m_iID))) {id = i->m_iID;return i->m_pUDT;}}return NULL; }那接收隊(duì)列CRcvQueue是用這個(gè)隊(duì)列來做什么的呢?這主要與接收隊(duì)列CRcvQueue的消息dispatch機(jī)制有關(guān)。在接收隊(duì)列CRcvQueue的worker線程中,接收到一條消息之后,它會根據(jù)消息的目標(biāo)SocketID,及發(fā)送端的地址等信息,將消息以不同的方式進(jìn)行dispatch,m_pRendezvousQueue中的CUDT是其中的一類dispatch目標(biāo)。后面我們在研究消息接收時(shí),會再來仔細(xì)研究接收隊(duì)列CRcvQueue的worker線程及m_pRendezvousQueue。
CHandShake的m_iID為發(fā)起端UDT Socket的SocketID,請求類型m_iReqType將被設(shè)置為了1,還設(shè)置了m_iMSS用于協(xié)商MSS值。CHandShake的構(gòu)造函數(shù)會初始化所有的字段(src/packet.cpp):
CHandShake::CHandShake(): m_iVersion(0),m_iType(0),m_iISN(0),m_iMSS(0),m_iFlightFlagSize(0),m_iReqType(0),m_iID(0),m_iCookie_iCookie(0) {for (int i = 0; i < 4; ++i)m_piPeerIP[i] = 0; }可以看到m_iCookie被初始化為了0。但注意在這里,CHandShake m_ConnReq的構(gòu)造過程中,m_iCookie并沒有被賦予新值。
隨機(jī)初始化序列號Sequence Number。
創(chuàng)建一個(gè)CPacket結(jié)構(gòu)request,為它創(chuàng)建大小為m_iPayloadSize的緩沖區(qū),將該緩沖區(qū)pack進(jìn)CPacket結(jié)構(gòu),并專門把request.m_iID,也就是這個(gè)包發(fā)送的目的UDT SocketID,設(shè)置為0。
m_iPayloadSize的值根據(jù)UDT Socket創(chuàng)建者的不同,在不同的地方設(shè)置。由應(yīng)用程序創(chuàng)建的UDT Socket在CUDT::open()中設(shè)置,比如Listening的UDT Socket在bind時(shí)會執(zhí)行CUDT::open(),或者連接UDT Server但沒有執(zhí)行過bind操作的UDT Socket會在CUDTUnited::connect()中執(zhí)行CUDT::open();UDT Server中由Listening的UDT Socket收到連接請求時(shí)創(chuàng)建的UDT Socket,在CUDT::connect(const sockaddr peer, CHandShake hs)中初設(shè)置;發(fā)起連接的UDT Socket還會在CUDT::connect(const CPacket& response)中再次更新這個(gè)值。但這個(gè)值總是被設(shè)置為m_iPktSize - CPacket::m_iPktHdrSize,CPacket::m_iPktHdrSize為固定的UDT Packet Header大小16。
m_iPktSize總是與m_iPayloadSize在相同的地方設(shè)置,被設(shè)置為m_iMSS - 28。m_iMSS,MSS(Maximum Segment Size,最大報(bào)文長度),這里是UDT協(xié)議定義的一個(gè)選項(xiàng),用于在UDT連接建立時(shí),收發(fā)雙方協(xié)商通信時(shí)每一個(gè)報(bào)文段所能承載的最大數(shù)據(jù)長度。在CUDT對象創(chuàng)建時(shí)被初始化為1500,但可以通過UDT::setsockopt()進(jìn)行設(shè)置。
這里先來看一下CPacket的結(jié)構(gòu)(src/packet.h):
class CPacket {friend class CChannel;friend class CSndQueue;friend class CRcvQueue;public:int32_t& m_iSeqNo; // alias: sequence numberint32_t& m_iMsgNo; // alias: message numberint32_t& m_iTimeStamp; // alias: timestampint32_t& m_iID; // alias: socket IDchar*& m_pcData; // alias: data/control informationstatic const int m_iPktHdrSize; // packet header sizepublic:CPacket();~CPacket();// Functionality:// Get the payload or the control information field length.// Parameters:// None.// Returned value:// the payload or the control information field length.int getLength() const;// Functionality:// Set the payload or the control information field length.// Parameters:// 0) [in] len: the payload or the control information field length.// Returned value:// None.void setLength(int len);// Functionality:// Pack a Control packet.// Parameters:// 0) [in] pkttype: packet type filed.// 1) [in] lparam: pointer to the first data structure, explained by the packet type.// 2) [in] rparam: pointer to the second data structure, explained by the packet type.// 3) [in] size: size of rparam, in number of bytes;// Returned value:// None.void pack(int pkttype, void* lparam = NULL, void* rparam = NULL, int size = 0);// Functionality:// Read the packet vector.// Parameters:// None.// Returned value:// Pointer to the packet vector.iovec* getPacketVector();// Functionality:// Read the packet flag.// Parameters:// None.// Returned value:// packet flag (0 or 1).int getFlag() const;// Functionality:// Read the packet type.// Parameters:// None.// Returned value:// packet type filed (000 ~ 111).int getType() const;// Functionality:// Read the extended packet type.// Parameters:// None.// Returned value:// extended packet type filed (0x000 ~ 0xFFF).int getExtendedType() const;// Functionality:// Read the ACK-2 seq. no.// Parameters:// None.// Returned value:// packet header field (bit 16~31).int32_t getAckSeqNo() const;// Functionality:// Read the message boundary flag bit.// Parameters:// None.// Returned value:// packet header field [1] (bit 0~1).int getMsgBoundary() const;// Functionality:// Read the message inorder delivery flag bit.// Parameters:// None.// Returned value:// packet header field [1] (bit 2).bool getMsgOrderFlag() const;// Functionality:// Read the message sequence number.// Parameters:// None.// Returned value:// packet header field [1] (bit 3~31).int32_t getMsgSeq() const;// Functionality:// Clone this packet.// Parameters:// None.// Returned value:// Pointer to the new packet.CPacket* clone() const;protected:uint32_t m_nHeader[4]; // The 128-bit header fieldiovec m_PacketVector[2]; // The 2-demension vector of UDT packet [header, data]int32_t __pad;protected:CPacket& operator=(const CPacket&); };它的數(shù)據(jù)成員是有4個(gè)uint32_t元素的數(shù)組m_nHeader,描述UDT Packet的Header,和有兩個(gè)元素的iovec數(shù)組m_PacketVector。另外的幾個(gè)引用則主要是為了方便對這些數(shù)據(jù)成員的訪問,看下CPacket的構(gòu)造函數(shù)就一目了然了(src/packet.cpp):
// Set up the aliases in the constructure CPacket::CPacket(): m_iSeqNo((int32_t&) (m_nHeader[0])),m_iMsgNo((int32_t&) (m_nHeader[1])),m_iTimeStamp((int32_t&) (m_nHeader[2])),m_iID((int32_t&) (m_nHeader[3])),m_pcData((char*&) (m_PacketVector[1].iov_base)),__pad() {for (int i = 0; i < 4; ++i)m_nHeader[i] = 0;m_PacketVector[0].iov_base = (char *) m_nHeader;m_PacketVector[0].iov_len = CPacket::m_iPktHdrSize;m_PacketVector[1].iov_base = NULL;m_PacketVector[1].iov_len = 0; }注意m_PacketVector的第一個(gè)元素指向了m_nHeader。
在CPacket::pack()中:
void CPacket::pack(int pkttype, void* lparam, void* rparam, int size) {// Set (bit-0 = 1) and (bit-1~15 = type)m_nHeader[0] = 0x80000000 | (pkttype << 16);// Set additional information and control information fieldswitch (pkttype) {case 2: //0010 - Acknowledgement (ACK)// ACK packet seq. no.if (NULL != lparam)m_nHeader[1] = *(int32_t *) lparam;// data ACK seq. no.// optional: RTT (microsends), RTT variance (microseconds) advertised flow window size (packets), and estimated link capacity (packets per second)m_PacketVector[1].iov_base = (char *) rparam;m_PacketVector[1].iov_len = size;break;case 6: //0110 - Acknowledgement of Acknowledgement (ACK-2)// ACK packet seq. no.m_nHeader[1] = *(int32_t *) lparam;// control info field should be none// but "writev" does not allow thism_PacketVector[1].iov_base = (char *) &__pad; //NULL;m_PacketVector[1].iov_len = 4; //0;break;case 3: //0011 - Loss Report (NAK)// loss listm_PacketVector[1].iov_base = (char *) rparam;m_PacketVector[1].iov_len = size;break;case 4: //0100 - Congestion Warning// control info field should be none// but "writev" does not allow thism_PacketVector[1].iov_base = (char *) &__pad; //NULL;m_PacketVector[1].iov_len = 4; //0;break;case 1: //0001 - Keep-alive// control info field should be none// but "writev" does not allow thism_PacketVector[1].iov_base = (char *) &__pad; //NULL;m_PacketVector[1].iov_len = 4; //0;break;case 0: //0000 - Handshake// control info filed is handshake infom_PacketVector[1].iov_base = (char *) rparam;m_PacketVector[1].iov_len = size; //sizeof(CHandShake);break;case 5: //0101 - Shutdown// control info field should be none// but "writev" does not allow thism_PacketVector[1].iov_base = (char *) &__pad; //NULL;m_PacketVector[1].iov_len = 4; //0;break;case 7: //0111 - Message Drop Request// msg idm_nHeader[1] = *(int32_t *) lparam;//first seq no, last seq nom_PacketVector[1].iov_base = (char *) rparam;m_PacketVector[1].iov_len = size;break;case 8: //1000 - Error Signal from the Peer Side// Error typem_nHeader[1] = *(int32_t *) lparam;// control info field should be none// but "writev" does not allow thism_PacketVector[1].iov_base = (char *) &__pad; //NULL;m_PacketVector[1].iov_len = 4; //0;break;case 32767: //0x7FFF - Reserved for user defined control packets// for extended control packet// "lparam" contains the extended type information for bit 16 - 31// "rparam" is the control informationm_nHeader[0] |= *(int32_t *) lparam;if (NULL != rparam) {m_PacketVector[1].iov_base = (char *) rparam;m_PacketVector[1].iov_len = size;} else {m_PacketVector[1].iov_base = (char *) &__pad;m_PacketVector[1].iov_len = 4;}break;default:break;} }在CPacket::pack()中,首先將m_nHeader[0],也就是m_iSeqNo的bit-0設(shè)為1表示這是一個(gè)控制包,將bit-1~15設(shè)置為消息的類型,然后根據(jù)消息的不同類型進(jìn)行不同的處理。對于Handshake消息,其pkttype為0,這里主要關(guān)注pkttype為0的case??梢娝褪亲宮_PacketVector[1]指向前面創(chuàng)建的緩沖區(qū)。
序列化時(shí),會將Handshake消息m_ConnReq全部的內(nèi)容拷貝進(jìn)緩沖區(qū)。略感奇怪,這個(gè)地方竟然完全沒有顧及字節(jié)序的問題。
CSndQueue的sendto()函數(shù)直接調(diào)用了CChannel::sendto():
int CChannel::sendto(const sockaddr* addr, CPacket& packet) const {cout << "CChannel send packet " << packet.m_iID << endl << endl;// convert control information into network orderif (packet.getFlag())for (int i = 0, n = packet.getLength() / 4; i < n; ++i)*((uint32_t *) packet.m_pcData + i) = htonl(*((uint32_t *) packet.m_pcData + i));// convert packet header into network order//for (int j = 0; j < 4; ++ j)// packet.m_nHeader[j] = htonl(packet.m_nHeader[j]);uint32_t* p = packet.m_nHeader;for (int j = 0; j < 4; ++j) {*p = htonl(*p);++p;}#ifndef WIN32msghdr mh;mh.msg_name = (sockaddr*) addr;mh.msg_namelen = m_iSockAddrSize;mh.msg_iov = (iovec*) packet.m_PacketVector;mh.msg_iovlen = 2;mh.msg_control = NULL;mh.msg_controllen = 0;mh.msg_flags = 0;int res = ::sendmsg(m_iSocket, &mh, 0); #elseDWORD size = CPacket::m_iPktHdrSize + packet.getLength();int addrsize = m_iSockAddrSize;int res = ::WSASendTo(m_iSocket, (LPWSABUF)packet.m_PacketVector, 2, &size, 0, addr, addrsize, NULL, NULL);res = (0 == res) ? size : -1; #endif// convert back into local host order//for (int k = 0; k < 4; ++ k)// packet.m_nHeader[k] = ntohl(packet.m_nHeader[k]);p = packet.m_nHeader;for (int k = 0; k < 4; ++k) {*p = ntohl(*p);++p;}if (packet.getFlag()) {for (int l = 0, n = packet.getLength() / 4; l < n; ++l)*((uint32_t *) packet.m_pcData + l) = ntohl(*((uint32_t *) packet.m_pcData + l));}return res; }在CChannel::sendto()中會處理Header的字節(jié)序問題。
這里總結(jié)一下,UDT Client向UDT Server發(fā)送的連接建立請求消息的內(nèi)容:消息主要分為兩個(gè)部分一個(gè)是消息的Header,一個(gè)是消息的Content。Header為4個(gè)uint32_t類型變量,從前到后這4個(gè)變量的含義分別為sequence number,message number,timestamp和目標(biāo)SocketID。就Handshake而言,sequence number的最高位,也就是bit-0為1,表示這是一個(gè)控制消息,bit-1~15為pkttype 0,其它位為0;message number及timestamp均為0,目標(biāo)SocketID為0。
Content部分,總共48個(gè)字節(jié),主要用于進(jìn)行連接的協(xié)商,如MSS等,具體可以看CHandShake。
我們暫時(shí)先看SynRecving模式,也就是默認(rèn)模式下的UDT Socket的行為。
創(chuàng)建一個(gè)CPacket response,同樣為它創(chuàng)建一個(gè)大小為m_iPayloadSize的緩沖區(qū)以存放數(shù)據(jù),并將緩沖區(qū)pack進(jìn)response中。這個(gè)CPacket response會被用來存放從UDT Server發(fā)回的相應(yīng)的信息。
進(jìn)入一個(gè)循環(huán)執(zhí)行后續(xù)的握手動作,及消息的超時(shí)重傳等動作。可以將這個(gè)循環(huán)看做由3個(gè)部分組成。
循環(huán)開始的地方是一段發(fā)送消息的代碼,在這段代碼中,其實(shí)做了兩個(gè)事情,或者說可能會發(fā)送兩種類型的消息,一是第一個(gè)握手消息的超時(shí)重傳,二是第二個(gè)握手消息的發(fā)送及超時(shí)重傳??瓷先グl(fā)送的都是CHandShake m_ConnReq,但在接收到第一個(gè)握手消息的響應(yīng)之后,這個(gè)結(jié)構(gòu)的某些成員會根據(jù)響應(yīng)而被修改。注意,發(fā)送第一個(gè)握手消息之后,首次進(jìn)入循環(huán),將會跳過這個(gè)部分。
之后的第二部分,主要用于接收響應(yīng),第一個(gè)握手消息的響應(yīng)及第二個(gè)握手消息的響應(yīng)。來看CRcvQueue::recvfrom()(src/queue.cpp):
int CRcvQueue::recvfrom(int32_t id, CPacket& packet) {CGuard bufferlock(m_PassLock);map<int32_t, std::queue<CPacket*> >::iterator i = m_mBuffer.find(id);if (i == m_mBuffer.end()) { #ifndef WIN32uint64_t now = CTimer::getTime();timespec timeout;timeout.tv_sec = now / 1000000 + 1;timeout.tv_nsec = (now % 1000000) * 1000;pthread_cond_timedwait(&m_PassCond, &m_PassLock, &timeout); #elseReleaseMutex(m_PassLock);WaitForSingleObject(m_PassCond, 1000);WaitForSingleObject(m_PassLock, INFINITE); #endifi = m_mBuffer.find(id);if (i == m_mBuffer.end()) {packet.setLength(-1);return -1;}}// retrieve the earliest packetCPacket* newpkt = i->second.front();if (packet.getLength() < newpkt->getLength()) {packet.setLength(-1);return -1;}// copy packet contentmemcpy(packet.m_nHeader, newpkt->m_nHeader, CPacket::m_iPktHdrSize);memcpy(packet.m_pcData, newpkt->m_pcData, newpkt->getLength());packet.setLength(newpkt->getLength());delete[] newpkt->m_pcData;delete newpkt;// remove this message from queue,// if no more messages left for this socket, release its data structurei->second.pop();if (i->second.empty())m_mBuffer.erase(i);return packet.getLength(); }這也是一個(gè)生產(chǎn)者-消費(fèi)者模型,在這里就如同listen的過程一樣,也只能看到這個(gè)生產(chǎn)與消費(fèi)的故事的一半,即消費(fèi)的那一半。生產(chǎn)者也是RcvQueue的worker線程。這個(gè)地方會等待著消息的到來,但也不會無限制的等待,可以看到,這里接收消息的等待時(shí)間大概為1s。這里是在等待一個(gè)CPacket隊(duì)列的出現(xiàn),也就是m_mBuffer中目標(biāo)UDT Socket的CPacket隊(duì)列。這里會從這個(gè)隊(duì)列中取出第一個(gè)packet返回給調(diào)用者。如果隊(duì)列被取空了,會直接將這個(gè)隊(duì)列從m_mBuffer中移除出去。
循環(huán)的第三部分是整個(gè)連接建立消息交互過程的超時(shí)處理,可以看到,非Rendezvous模式下超時(shí)時(shí)間為3s,Rendezvous模式下,超時(shí)時(shí)間則會延長十倍。
CUDT::connect()執(zhí)行到接收第一個(gè)握手消息的相應(yīng)時(shí),連接建立請求的發(fā)起也算是基本完成了。下面來看UDT Server端收到這個(gè)消息時(shí)是如何處理的。
UDT Server對首個(gè)Handshake消息的處理
來看UDT Server端收到這個(gè)消息時(shí)是如何處理的。如我們前面在 UDT協(xié)議實(shí)現(xiàn)分析——bind、listen與accept 一文中了解到的,Listening的UDT Socket會在UDT::accept()中等待連接請求進(jìn)來,那是一個(gè)生產(chǎn)者與消費(fèi)者的故事,UDT::accept()是生產(chǎn)者,接收隊(duì)列RcvQueue的worker線程是消費(fèi)者。
我們這就來仔細(xì)地看一下RcvQueue的worker線程,當(dāng)然重點(diǎn)會關(guān)注對于Handshake消息,也就是目標(biāo)SocketID為0,pkttype為0的packet的處理(src/queue.cpp):
#ifndef WIN32 void* CRcvQueue::worker(void* param) #elseDWORD WINAPI CRcvQueue::worker(LPVOID param) #endif{CRcvQueue* self = (CRcvQueue*) param;sockaddr* addr =(AF_INET == self->m_UnitQueue.m_iIPversion) ? (sockaddr*) new sockaddr_in : (sockaddr*) new sockaddr_in6;CUDT* u = NULL;int32_t id;while (!self->m_bClosing) { #ifdef NO_BUSY_WAITINGself->m_pTimer->tick(); #endif// check waiting list, if new socket, insert it to the listwhile (self->ifNewEntry()) {CUDT* ne = self->getNewEntry();if (NULL != ne) {self->m_pRcvUList->insert(ne);self->m_pHash->insert(ne->m_SocketID, ne);}}// find next available slot for incoming packetCUnit* unit = self->m_UnitQueue.getNextAvailUnit();if (NULL == unit) {// no space, skip this packetCPacket temp;temp.m_pcData = new char[self->m_iPayloadSize];temp.setLength(self->m_iPayloadSize);self->m_pChannel->recvfrom(addr, temp);delete[] temp.m_pcData;goto TIMER_CHECK;}unit->m_Packet.setLength(self->m_iPayloadSize);// reading next incoming packet, recvfrom returns -1 is nothing has been receivedif (self->m_pChannel->recvfrom(addr, unit->m_Packet) < 0)goto TIMER_CHECK;id = unit->m_Packet.m_iID;// ID 0 is for connection request, which should be passed to the listening socket or rendezvous socketsif (0 == id) {if (NULL != self->m_pListener)self->m_pListener->listen(addr, unit->m_Packet);else if (NULL != (u = self->m_pRendezvousQueue->retrieve(addr, id))) {// asynchronous connect: call connect here// otherwise wait for the UDT socket to retrieve this packetif (!u->m_bSynRecving)u->connect(unit->m_Packet);elseself->storePkt(id, unit->m_Packet.clone());}} else if (id > 0) {if (NULL != (u = self->m_pHash->lookup(id))) {if (CIPAddress::ipcmp(addr, u->m_pPeerAddr, u->m_iIPversion)) {if (u->m_bConnected && !u->m_bBroken && !u->m_bClosing) {if (0 == unit->m_Packet.getFlag())u->processData(unit);elseu->processCtrl(unit->m_Packet);u->checkTimers();self->m_pRcvUList->update(u);}}} else if (NULL != (u = self->m_pRendezvousQueue->retrieve(addr, id))) {if (!u->m_bSynRecving)u->connect(unit->m_Packet);elseself->storePkt(id, unit->m_Packet.clone());}}TIMER_CHECK:// take care of the timing event for all UDT socketsuint64_t currtime;CTimer::rdtsc(currtime);CRNode* ul = self->m_pRcvUList->m_pUList;uint64_t ctime = currtime - 100000 * CTimer::getCPUFrequency();while ((NULL != ul) && (ul->m_llTimeStamp < ctime)) {CUDT* u = ul->m_pUDT;if (u->m_bConnected && !u->m_bBroken && !u->m_bClosing) {u->checkTimers();self->m_pRcvUList->update(u);} else {// the socket must be removed from Hash table first, then RcvUListself->m_pHash->remove(u->m_SocketID);self->m_pRcvUList->remove(u);u->m_pRNode->m_bOnList = false;}ul = self->m_pRcvUList->m_pUList;}// Check connection requests status for all sockets in the RendezvousQueue.self->m_pRendezvousQueue->updateConnStatus();}if (AF_INET == self->m_UnitQueue.m_iIPversion)delete (sockaddr_in*) addr;elsedelete (sockaddr_in6*) addr;#ifndef WIN32return NULL; #elseSetEvent(self->m_ExitCond);return 0; #endif }這個(gè)函數(shù),首先創(chuàng)建了一個(gè)sockaddr,用于保存發(fā)送端的地址。
然后就進(jìn)入了一個(gè)循環(huán),不斷地接收UDP消息。
循環(huán)內(nèi)的第一行是執(zhí)行Timer的tick(),這個(gè)是UDT自己的定時(shí)器Timer機(jī)制的一部分。
接下來的這個(gè)子循環(huán)也主要與RcvQueue的worker線程中消息的dispatch機(jī)制有關(guān)。
然后是取一個(gè)CUnit,用來接收其它端點(diǎn)發(fā)送過來的消息。如果取不到,則接收UDP包并丟棄。然后跳過后面消息dispatch的過程。這個(gè)地方的m_UnitQueue用來做緩存,也用來防止收到過多的包消耗過多的資源。完整的CUnitQueue機(jī)制暫時(shí)先不去仔細(xì)分析。
然后就是取到了CUnit的情況,則先通過CChannel接收一個(gè)包,并根據(jù)包的內(nèi)容進(jìn)行包的dispatch。不能跑偏了,這里主要關(guān)注目標(biāo)SocketID為0,pkttype為0的包的dispatch??梢钥吹?#xff0c;在Listener存在的情況下,是dispatch給了listener,也就是Listening的UDT Socket的CUDT的listen()函數(shù),否則會dispatch給通道上處于Rendezvous模式的UDT Socket。(在 UDT協(xié)議實(shí)現(xiàn)分析——bind、listen與accept 一文中關(guān)于listen的部分有具體理過這個(gè)listener的設(shè)置過程。)可以看到,對于相同的通道CChannel,也就是同一個(gè)端口上,Rendezvous模式下的UDT Socket和Listening的UDT Socket不能共存,或者說同時(shí)存在時(shí),Rendezvous的行為可能不是預(yù)期的,但多個(gè)處于Rendezvous模式下的UDT Socket可以共存。
接收隊(duì)列CRcvQueue的worker()線程做的其它事情,暫時(shí)先不去仔細(xì)看。這里先來理一下Listening的UDT Socket在接收到Handshake消息的處理過程,也就是CUDT::listen(sockaddr* addr, CPacket& packet)(src/core.cpp):
int CUDT::listen(sockaddr* addr, CPacket& packet) {if (m_bClosing)return 1002;if (packet.getLength() != CHandShake::m_iContentSize)return 1004;CHandShake hs;hs.deserialize(packet.m_pcData, packet.getLength());// SYN cookiechar clienthost[NI_MAXHOST];char clientport[NI_MAXSERV];getnameinfo(addr, (AF_INET == m_iVersion) ? sizeof(sockaddr_in) : sizeof(sockaddr_in6), clienthost,sizeof(clienthost), clientport, sizeof(clientport), NI_NUMERICHOST | NI_NUMERICSERV);int64_t timestamp = (CTimer::getTime() - m_StartTime) / 60000000; // secret changes every one minutestringstream cookiestr;cookiestr << clienthost << ":" << clientport << ":" << timestamp;unsigned char cookie[16];CMD5::compute(cookiestr.str().c_str(), cookie);if (1 == hs.m_iReqType) {hs.m_iCookie = *(int*) cookie;packet.m_iID = hs.m_iID;int size = packet.getLength();hs.serialize(packet.m_pcData, size);m_pSndQueue->sendto(addr, packet);return 0;} else {if (hs.m_iCookie != *(int*) cookie) {timestamp--;cookiestr << clienthost << ":" << clientport << ":" << timestamp;CMD5::compute(cookiestr.str().c_str(), cookie);if (hs.m_iCookie != *(int*) cookie)return -1;}}int32_t id = hs.m_iID;// When a peer side connects in...if ((1 == packet.getFlag()) && (0 == packet.getType())) {if ((hs.m_iVersion != m_iVersion) || (hs.m_iType != m_iSockType)) {// mismatch, reject the requesths.m_iReqType = 1002;int size = CHandShake::m_iContentSize;hs.serialize(packet.m_pcData, size);packet.m_iID = id;m_pSndQueue->sendto(addr, packet);} else {int result = s_UDTUnited.newConnection(m_SocketID, addr, &hs);if (result == -1)hs.m_iReqType = 1002;// send back a response if connection failed or connection already existed// new connection response should be sent in connect()if (result != 1) {int size = CHandShake::m_iContentSize;hs.serialize(packet.m_pcData, size);packet.m_iID = id;m_pSndQueue->sendto(addr, packet);} else {// a new connection has been created, enable epoll for writes_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_OUT, true);}}}return hs.m_iReqType; }在這個(gè)函數(shù)中主要做了這樣的一些事情:
檢查UDT Socket的狀態(tài),如果處于Closing狀態(tài)下,就返回,否則繼續(xù)執(zhí)行。
檢查包的數(shù)據(jù)部分長度。若長度不為CHandShake::m_iContentSize 48字節(jié),則說明這不是一個(gè)有效的Handshake,則返回,否則繼續(xù)執(zhí)行。
創(chuàng)建一個(gè)CHandShake hs,并將傳入的packet的數(shù)據(jù)部分反序列化進(jìn)這個(gè)CHandShake。這里來掃一眼這個(gè)CHandShake::deserialize()(src/packet.cpp):
int CHandShake::deserialize(const char* buf, int size) {if (size < m_iContentSize)return -1;int32_t* p = (int32_t*) buf;m_iVersion = *p++;m_iType = *p++;m_iISN = *p++;m_iMSS = *p++;m_iFlightFlagSize = *p++;m_iReqType = *p++;m_iID = *p++;m_iCookie = *p++;for (int i = 0; i < 4; ++i)m_piPeerIP[i] = *p++;return 0; }這個(gè)函數(shù)如同它的反函數(shù)serialize()一樣沒有處理字節(jié)序的問題。
計(jì)算cookie值。所謂cookie值,即由連接發(fā)起端的網(wǎng)絡(luò)地址(包括IP地址與端口號)及時(shí)間戳組成的字符串計(jì)算出來的16個(gè)字節(jié)長度的MD5值。時(shí)間戳精確到分鐘值。用于計(jì)算MD5值的字符串類似127.0.0.1:49033:0。
計(jì)算出來cookie值之后的部分,應(yīng)該被分成兩個(gè)部分。一部分處理連接發(fā)起端發(fā)送的地一個(gè)握手包,也就是hs.m_iReqType == 1的block,在CUDT::connect()中構(gòu)造m_ConnReq的部分我們有看到這個(gè)值要被設(shè)為1的;另一部分則處理連接發(fā)起端發(fā)送的第二個(gè)握手消息。這里我們先來看hs.m_iReqType == 1的block。
它取前一步計(jì)算的cookie的前4個(gè)字節(jié),直接將其強(qiáng)轉(zhuǎn)為一個(gè)int值,賦給前面反序列化的CHandShake的m_iCookie。這個(gè)地方竟然顧及字節(jié)序的問題,也沒有顧及不同平臺的差異,即int類型的長度在不同的機(jī)器上可能不同,這個(gè)地方用int32_t似乎要更安全一點(diǎn)。將CHandShake的m_iID,如我們在CUDT::connect()中構(gòu)造m_ConnReq的部分我們有看到的,為連接發(fā)起端UDT Socket的SocketID,設(shè)置給packet的m_iID,也就是包的目標(biāo)SocketID。再將hs重新序列化進(jìn)packet。通過發(fā)送隊(duì)列SndQueue發(fā)送經(jīng)過了這一番修改的packet。然后返回。
總結(jié)一下UDT Server中Listening的UDT Socket接收到第一個(gè)HandShake包時(shí),對于這個(gè)包的處理過程:
計(jì)算一個(gè)cookie值,設(shè)置給接收到的HandShake的cookie字段,修改包的目標(biāo)SocketID字段為發(fā)起連接的UDT Socket的SocketID,包的其它部分原封不動,最后將這個(gè)包重新發(fā)回給連接發(fā)起端。
UDT Client發(fā)送第二個(gè)HandShake消息
UDT Server接收到第一個(gè)HandShake消息,回給UDT Client一個(gè)HandShake消息。這樣球就又被踢回給了UDT Client端。接著來看在UDT Client端接收到首個(gè)HandShake包的響應(yīng)后會做什么樣的處理。
我們知道在CUDT::connect(const sockaddr* serv_addr)中,發(fā)送首個(gè)HandShake包之后,會調(diào)用CRcvQueue::recvfrom()來等著接收UDT Server的響應(yīng),消費(fèi)者焦急地等待著食物的到來。在消息到來時(shí),CUDT::connect()會被生產(chǎn)者,也就是CRcvQueue的worker線程喚醒。這里就來具體看一下這個(gè)生產(chǎn)與消費(fèi)的故事的另一半,生產(chǎn)的故事,也就是CRcvQueue的worker線程的消息dispatch。
在CRcvQueue::worker()中包dispatch的部分可以看到:
} else if (id > 0) {if (NULL != (u = self->m_pHash->lookup(id))) {if (CIPAddress::ipcmp(addr, u->m_pPeerAddr, u->m_iIPversion)) {cout << "Receive packet by m_pHash table" << endl;if (u->m_bConnected && !u->m_bBroken && !u->m_bClosing) {if (0 == unit->m_Packet.getFlag())u->processData(unit);elseu->processCtrl(unit->m_Packet);u->checkTimers();self->m_pRcvUList->update(u);}}} else if (NULL != (u = self->m_pRendezvousQueue->retrieve(addr, id))) {cout << "Receive packet by m_pRendezvousQueue, u->m_bSynRecving " << u->m_bSynRecving << endl;if (!u->m_bSynRecving)u->connect(unit->m_Packet);elseself->storePkt(id, unit->m_Packet.clone());}}我們知道UDT Server回復(fù)的消息中是設(shè)置了目標(biāo)SocketID了的。因而會走id > 0的block。
在CUDT::connect( const sockaddr* serv_addr )中有看到調(diào)用m_pRcvQueue->registerConnector()將CUDT添加進(jìn)RcvQueue的m_pRendezvousQueue中,因而這里會執(zhí)行id > 0 block中下面的那個(gè)block。
如果前面對于m_bSynRecving的分析,默認(rèn)情況為true。因而這個(gè)地方會執(zhí)行CRcvQueue::storePkt()來存儲包。來看這個(gè)函數(shù)的實(shí)現(xiàn):
void CRcvQueue::storePkt(int32_t id, CPacket* pkt) {CGuard bufferlock(m_PassLock);map<int32_t, std::queue<CPacket*> >::iterator i = m_mBuffer.find(id);if (i == m_mBuffer.end()) {m_mBuffer[id].push(pkt);#ifndef WIN32pthread_cond_signal(&m_PassCond); #elseSetEvent(m_PassCond); #endif} else {//avoid storing too many packets, in case of malfunction or attackif (i->second.size() > 16)return;i->second.push(pkt);} }在這個(gè)函數(shù)中會保存接收到的packet,并在必要的時(shí)候喚醒等待接收消息的線程。(對應(yīng)CRcvQueue::recvfrom()的邏輯來看。)
然后來看CUDT::connect(const sockaddr* serv_addr)在收到第一個(gè)HandShake消息的響應(yīng)之后會做什么樣的處理,也就是CUDT::connect(const CPacket& response)(src/core.cpp):
int CUDT::connect(const CPacket& response) throw () {// this is the 2nd half of a connection request. If the connection is setup successfully this returns 0.// returning -1 means there is an error.// returning 1 or 2 means the connection is in process and needs more handshakeif (!m_bConnecting)return -1;if (m_bRendezvous && ((0 == response.getFlag()) || (1 == response.getType())) && (0 != m_ConnRes.m_iType)) {//a data packet or a keep-alive packet comes, which means the peer side is already connected// in this situation, the previously recorded response will be usedgoto POST_CONNECT;}if ((1 != response.getFlag()) || (0 != response.getType()))return -1;m_ConnRes.deserialize(response.m_pcData, response.getLength());if (m_bRendezvous) {// regular connect should NOT communicate with rendezvous connect// rendezvous connect require 3-way handshakeif (1 == m_ConnRes.m_iReqType)return -1;if ((0 == m_ConnReq.m_iReqType) || (0 == m_ConnRes.m_iReqType)) {m_ConnReq.m_iReqType = -1;// the request time must be updated so that the next handshake can be sent out immediately.m_llLastReqTime = 0;return 1;}} else {// set cookieif (1 == m_ConnRes.m_iReqType) {m_ConnReq.m_iReqType = -1;m_ConnReq.m_iCookie = m_ConnRes.m_iCookie;m_llLastReqTime = 0;return 1;}}這個(gè)函數(shù)會處理第一個(gè)HandShake的響應(yīng),也會處理第二個(gè)HandShake的響應(yīng),這里先來關(guān)注第一個(gè)HandShake的響應(yīng)的處理,因而只列出它的一部分的代碼。
這個(gè)函數(shù)先是檢查了CUDT的狀態(tài),檢查了packet的有效性,然后就是將接收到的包的數(shù)據(jù)部分反序列化至CHandShake m_ConnRes中。我們不關(guān)注對于Rendezvous模式的處理。
接著會檢查m_ConnRes的m_iReqType,若為1,則設(shè)置m_ConnReq.m_iReqType為-1,設(shè)置m_ConnReq.m_iCookie為m_ConnRes.m_iCookie用以標(biāo)識m_ConnReq為一個(gè)合法的第二個(gè)HandShake packet;同時(shí)設(shè)置m_llLastReqTime為0,如我們前面對CUDT::connect(const sockaddr* serv_addr)的分析,以便于此刻保存于m_ConnReq中的第二個(gè)HandShake能夠被發(fā)送出去as soon as possible。
這第二個(gè)HandShake,與第一個(gè)HandShake的差異僅僅在于有了有效的Cookie值,且請求類型ReqType為-1。其它則完全一樣。
UDT Server對第二個(gè)HandShake的處理
UDT Client對于m_ConnReq的改變并不足以改變接收隊(duì)列中worker線程對這個(gè)包的dispatch規(guī)則,因而直接來看CUDT::listen(sockaddr* addr, CPacket& packet)中對于這第二個(gè)HandShake消息的處理。
接著前面對于這個(gè)函數(shù)的分析,接前面的第4步。
對于這第二個(gè)HandShake,它的ReqType自然不再是1了,而是-1。因而在計(jì)算完了cookie值之后,它會先驗(yàn)證一下HandShake包中的cookie值是否是有效的,如果無效,則直接返回。根據(jù)這個(gè)地方的邏輯,可以看到cookie的有效時(shí)間最長為2分鐘。
檢查包的Flag和Type,如果不是HandShake包,則直接返回,否則繼續(xù)執(zhí)行。
檢查連接發(fā)起端IP的版本及Socket類型SockType與本地Listen的UDT Socket是否匹配。若不匹配,則將錯(cuò)誤碼1002放在發(fā)過來的HandShanke的ReqType字段中,設(shè)置packet的目標(biāo)SocketID為發(fā)起連接的SocketID,然后將這個(gè)包重新發(fā)回給UDT Client。
檢查之后,發(fā)現(xiàn)完全匹配的情況。調(diào)用CUDTUnited::newConnection()創(chuàng)建一個(gè)新的UDT Socket。若創(chuàng)建過程執(zhí)行失敗,則將錯(cuò)誤碼1002放在發(fā)過來的HandShanke的ReqType字段中。若創(chuàng)建成功,會設(shè)置發(fā)過來的packet的目標(biāo)SocketID為適當(dāng)?shù)闹?#xff0c;然后將同一個(gè)包再發(fā)送回UDT Client。CUDTUnited::newConnection()會適當(dāng)?shù)匦薷腍andShake packet的一些字段。若失敗在執(zhí)行s_UDTUnited.m_EPoll.update_events()。
返回hs.m_iReqType。
然后來看在CUDTUnited::newConnection()中是如何新建Socket的:
int CUDTUnited::newConnection(const UDTSOCKET listen, const sockaddr* peer, CHandShake* hs) {CUDTSocket* ns = NULL;CUDTSocket* ls = locate(listen);if (NULL == ls)return -1;// if this connection has already been processedif (NULL != (ns = locate(peer, hs->m_iID, hs->m_iISN))) {if (ns->m_pUDT->m_bBroken) {// last connection from the "peer" address has been brokenns->m_Status = CLOSED;ns->m_TimeStamp = CTimer::getTime();CGuard::enterCS(ls->m_AcceptLock);ls->m_pQueuedSockets->erase(ns->m_SocketID);ls->m_pAcceptSockets->erase(ns->m_SocketID);CGuard::leaveCS(ls->m_AcceptLock);} else {// connection already exist, this is a repeated connection request// respond with existing HS informationhs->m_iISN = ns->m_pUDT->m_iISN;hs->m_iMSS = ns->m_pUDT->m_iMSS;hs->m_iFlightFlagSize = ns->m_pUDT->m_iFlightFlagSize;hs->m_iReqType = -1;hs->m_iID = ns->m_SocketID;return 0;//except for this situation a new connection should be started}}// exceeding backlog, refuse the connection requestif (ls->m_pQueuedSockets->size() >= ls->m_uiBackLog)return -1;try {ns = new CUDTSocket;ns->m_pUDT = new CUDT(*(ls->m_pUDT));if (AF_INET == ls->m_iIPversion) {ns->m_pSelfAddr = (sockaddr*) (new sockaddr_in);((sockaddr_in*) (ns->m_pSelfAddr))->sin_port = 0;ns->m_pPeerAddr = (sockaddr*) (new sockaddr_in);memcpy(ns->m_pPeerAddr, peer, sizeof(sockaddr_in));} else {ns->m_pSelfAddr = (sockaddr*) (new sockaddr_in6);((sockaddr_in6*) (ns->m_pSelfAddr))->sin6_port = 0;ns->m_pPeerAddr = (sockaddr*) (new sockaddr_in6);memcpy(ns->m_pPeerAddr, peer, sizeof(sockaddr_in6));}} catch (...) {delete ns;return -1;}CGuard::enterCS(m_IDLock);ns->m_SocketID = --m_SocketID;cout << "new CUDTSocket SocketID is " << ns->m_SocketID << " PeerID " << hs->m_iID << endl;CGuard::leaveCS(m_IDLock);ns->m_ListenSocket = listen;ns->m_iIPversion = ls->m_iIPversion;ns->m_pUDT->m_SocketID = ns->m_SocketID;ns->m_PeerID = hs->m_iID;ns->m_iISN = hs->m_iISN;int error = 0;try {// bind to the same addr of listening socketns->m_pUDT->open();updateMux(ns, ls);ns->m_pUDT->connect(peer, hs);} catch (...) {error = 1;goto ERR_ROLLBACK;}ns->m_Status = CONNECTED;// copy address information of local nodens->m_pUDT->m_pSndQueue->m_pChannel->getSockAddr(ns->m_pSelfAddr);CIPAddress::pton(ns->m_pSelfAddr, ns->m_pUDT->m_piSelfIP, ns->m_iIPversion);// protect the m_Sockets structure.CGuard::enterCS(m_ControlLock);try {m_Sockets[ns->m_SocketID] = ns;m_PeerRec[(ns->m_PeerID << 30) + ns->m_iISN].insert(ns->m_SocketID);} catch (...) {error = 2;}CGuard::leaveCS(m_ControlLock);CGuard::enterCS(ls->m_AcceptLock);try {ls->m_pQueuedSockets->insert(ns->m_SocketID);} catch (...) {error = 3;}CGuard::leaveCS(ls->m_AcceptLock);// acknowledge users waiting for new connections on the listening socketm_EPoll.update_events(listen, ls->m_pUDT->m_sPollID, UDT_EPOLL_IN, true);CTimer::triggerEvent();ERR_ROLLBACK: if (error > 0) {ns->m_pUDT->close();ns->m_Status = CLOSED;ns->m_TimeStamp = CTimer::getTime();return -1;}// wake up a waiting accept() call #ifndef WIN32pthread_mutex_lock(&(ls->m_AcceptLock));pthread_cond_signal(&(ls->m_AcceptCond));pthread_mutex_unlock(&(ls->m_AcceptLock)); #elseSetEvent(ls->m_AcceptCond); #endifreturn 1; }在這個(gè)函數(shù)中做了如下這樣的一些事情:
找到listening的UDT Socket的CUDTSocket結(jié)構(gòu),若找不到則直接返回-1。否則繼續(xù)執(zhí)行。
檢查相同的連接請求是否已經(jīng)處理過了。在CUDTUnited有一個(gè)專門的緩沖區(qū)m_PeerRec,用來存放由Listening的Socket創(chuàng)建的UDT Socket,這里主要是通過在這個(gè)緩沖區(qū)中查找是否已經(jīng)有connection請求對應(yīng)的socket來判斷:
如果已經(jīng)為這個(gè)connection請求創(chuàng)建了UDT Socket,又分為兩種情況:
(1). 為connection請求創(chuàng)建的UDT Socket還是好的,可用的,則根據(jù)之前創(chuàng)建的UDT Socket的一些字段設(shè)置接收到的HandShake,m_iReqType會被設(shè)置為-1,m_iID會被設(shè)置為UDT Socket的SocketID。然后返回0。如我們前面在CUDTUnited::newConnection()中看到的,這樣返回之后,CUDTUnited::newConnection()會發(fā)送一個(gè)響應(yīng)消息給UDT Client。
(2). 為connection請求創(chuàng)建的UDT Socket已經(jīng)爛掉了,不可用了,此時(shí)則主要會將其狀態(tài)設(shè)置為CLOSED,設(shè)置時(shí)間戳,將其從m_pQueuedSockets和m_pAcceptSockets中移除出去。然后執(zhí)行后續(xù)的新建UDT Socket的流程。
但對于一個(gè)由Listening Socket創(chuàng)建的UDT Socket而言,又會是什么原因?qū)е滤幱赽roken狀態(tài)呢?此處這樣的檢查是否真有必要呢?后面會再來研究。
檢查m_pQueuedSockets的大小是否超出了為Listening的UDT Socket設(shè)置的backlog大小,若超出,則返回-1,否則繼續(xù)執(zhí)行。
創(chuàng)建一個(gè)CUDTSocket對象。創(chuàng)建一個(gè)CUDT對象,這里創(chuàng)建的CUDT對象會繼承Listening的UDT Socket的許多屬性(src/api.cpp):
為SelfAddr分配內(nèi)存。
為PeerAddr分配內(nèi)存。
拷貝發(fā)送端地址到PeerAddr。
設(shè)置SocketID。等等。
這個(gè)函數(shù)里會根據(jù)HandShake包設(shè)置非常多的成員。但主要來關(guān)注m_pRcvQueue->setNewEntry(this),這個(gè)調(diào)用也是與RcvQueue的worker線程的消息dispatch機(jī)制有關(guān)。后面我們會再來仔細(xì)地了解這個(gè)函數(shù)。
這個(gè)函數(shù)會在最后發(fā)送響應(yīng)給UDT Client。
將UDT Socket的狀態(tài)置為CONNECTED??截怌hannel的地址到PeerAddr。
將創(chuàng)建的CUDTSocket放進(jìn)m_Sockets中,同時(shí)放進(jìn)m_PeerRec中。
將創(chuàng)建的UDT Socket放進(jìn)m_pQueuedSockets中。這正是Listening UDT Socket accept那個(gè)生產(chǎn)-消費(fèi)故事的另一半,這里是生產(chǎn)者。
將等待在accept()的線程喚醒。至此在UDT Server端,accept()返回一個(gè)UDT Socket,UDT Server認(rèn)為一個(gè)連接成功建立。
UDT Client從UDT::connect()返回
如我們前面看到的,CUDT::connect(const sockaddr* serv_addr)在發(fā)送了第二個(gè)Handshake消息之后,它就會開是等待UDT Server的第二次響應(yīng)。UDT Server發(fā)送第二個(gè)Handshake消息的相應(yīng)之后,UDT Client端將會返回并處理它。這個(gè)消息的dispatch過程與第一個(gè)HandShake的響應(yīng)消息的處理過程一致,這里不再贅述。這里來看這第二個(gè)HandShake的響應(yīng)消息的處理,同樣是在CUDT::connect(const CPacket& response)中:
} else {// set cookieif (1 == m_ConnRes.m_iReqType) {m_ConnReq.m_iReqType = -1;m_ConnReq.m_iCookie = m_ConnRes.m_iCookie;m_llLastReqTime = 0;return 1;}}POST_CONNECT:// Remove from rendezvous queuem_pRcvQueue->removeConnector(m_SocketID);// Re-configure according to the negotiated values.m_iMSS = m_ConnRes.m_iMSS;m_iFlowWindowSize = m_ConnRes.m_iFlightFlagSize;m_iPktSize = m_iMSS - 28;m_iPayloadSize = m_iPktSize - CPacket::m_iPktHdrSize;m_iPeerISN = m_ConnRes.m_iISN;m_iRcvLastAck = m_ConnRes.m_iISN;m_iRcvLastAckAck = m_ConnRes.m_iISN;m_iRcvCurrSeqNo = m_ConnRes.m_iISN - 1;m_PeerID = m_ConnRes.m_iID;memcpy(m_piSelfIP, m_ConnRes.m_piPeerIP, 16);// Prepare all data structurestry {m_pSndBuffer = new CSndBuffer(32, m_iPayloadSize);m_pRcvBuffer = new CRcvBuffer(&(m_pRcvQueue->m_UnitQueue), m_iRcvBufSize);// after introducing lite ACK, the sndlosslist may not be cleared in time, so it requires twice space.m_pSndLossList = new CSndLossList(m_iFlowWindowSize * 2);m_pRcvLossList = new CRcvLossList(m_iFlightFlagSize);m_pACKWindow = new CACKWindow(1024);m_pRcvTimeWindow = new CPktTimeWindow(16, 64);m_pSndTimeWindow = new CPktTimeWindow();} catch (...) {throw CUDTException(3, 2, 0);}CInfoBlock ib;ib.m_iIPversion = m_iIPversion;CInfoBlock::convert(m_pPeerAddr, m_iIPversion, ib.m_piIP);if (m_pCache->lookup(&ib) >= 0) {m_iRTT = ib.m_iRTT;m_iBandwidth = ib.m_iBandwidth;}m_pCC = m_pCCFactory->create();m_pCC->m_UDT = m_SocketID;m_pCC->setMSS(m_iMSS);m_pCC->setMaxCWndSize(m_iFlowWindowSize);m_pCC->setSndCurrSeqNo(m_iSndCurrSeqNo);m_pCC->setRcvRate(m_iDeliveryRate);m_pCC->setRTT(m_iRTT);m_pCC->setBandwidth(m_iBandwidth);m_pCC->init();m_ullInterval = (uint64_t) (m_pCC->m_dPktSndPeriod * m_ullCPUFrequency);m_dCongestionWindow = m_pCC->m_dCWndSize;// And, I am connected too.m_bConnecting = false;m_bConnected = true;// register this socket for receiving data packetsm_pRNode->m_bOnList = true;m_pRcvQueue->setNewEntry(this);// acknowledge the management module.s_UDTUnited.connect_complete(m_SocketID);// acknowledde any waiting epolls to writes_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_OUT, true);return 0; }這里做的第一件事就是調(diào)用m_pRcvQueue->removeConnector(m_SocketID)將自己從RevQueue的RendezvousQueue中移除,以表示自己將不再接收Rendezvous消息(src/queue.cpp):
void CRcvQueue::removeConnector(const UDTSOCKET& id) {m_pRendezvousQueue->remove(id);CGuard bufferlock(m_PassLock);map<int32_t, std::queue<CPacket*> >::iterator i = m_mBuffer.find(id);if (i != m_mBuffer.end()) {while (!i->second.empty()) {delete[] i->second.front()->m_pcData;delete i->second.front();i->second.pop();}m_mBuffer.erase(i);} }這個(gè)函數(shù)執(zhí)行完之后,RcvQueue暫時(shí)將無法向UDT Socket dispatch包。
根據(jù)協(xié)商的值重新做配置。這里我們可以再來看一下UDT的協(xié)商指的是什么。縱覽連接建立的整個(gè)過程,我們并沒有看到針對這些需要協(xié)商的值UDT本身有什么特殊的算法來計(jì)算,因而所謂的協(xié)商則主要是UDT Client端和UDT Server端,針對這些選項(xiàng),不同應(yīng)用程序?qū)硬煌O(shè)置的同步協(xié)調(diào)。
準(zhǔn)備所有的數(shù)據(jù)緩沖區(qū)。
設(shè)置CUDT的狀態(tài),m_bConnecting為false,m_bConnected為true。
執(zhí)行m_pRcvQueue->setNewEntry(this),注冊socket來接收數(shù)據(jù)包。這里來看一下CRcvQueue::setNewEntry(CUDT* u):
這個(gè)操作本身非常簡單。但把CUDT結(jié)構(gòu)放進(jìn)CRcvQueue之后,又會發(fā)生什么呢?回憶我們前面看到的CRcvQueue::worker(void* param)函數(shù)中循環(huán)開始部分的這段代碼:
// check waiting list, if new socket, insert it to the listwhile (self->ifNewEntry()) {CUDT* ne = self->getNewEntry();if (NULL != ne) {self->m_pRcvUList->insert(ne);self->m_pHash->insert(ne->m_SocketID, ne);}}對照這段代碼中用到的幾個(gè)函數(shù)的實(shí)現(xiàn):
bool CRcvQueue::ifNewEntry() {return !(m_vNewEntry.empty()); }CUDT* CRcvQueue::getNewEntry() {CGuard listguard(m_IDLock);if (m_vNewEntry.empty())return NULL;CUDT* u = (CUDT*) *(m_vNewEntry.begin());m_vNewEntry.erase(m_vNewEntry.begin());return u; }可以了解到,在 執(zhí)行m_pRcvQueue->setNewEntry(this),注冊socket之后,CRcvQueue的worker線程會將這個(gè)CUDT結(jié)構(gòu)從它的m_vNewEntry中移到另外的兩個(gè)容器m_pRcvUList和m_pHash中。那然后呢?在CRcvQueue::worker(void* param)中不是還有下面這段嗎:
if (NULL != (u = self->m_pHash->lookup(id))) {if (CIPAddress::ipcmp(addr, u->m_pPeerAddr, u->m_iIPversion)) {cout << "Receive packet by m_pHash table" << endl;if (u->m_bConnected && !u->m_bBroken && !u->m_bClosing) {if (0 == unit->m_Packet.getFlag())u->processData(unit);elseu->processCtrl(unit->m_Packet);u->checkTimers();self->m_pRcvUList->update(u);}}} else if (NULL != (u = self->m_pRendezvousQueue->retrieve(addr, id))) {就是這樣,可以說,在CUDT::connect(const CPacket& response)中是完成了一次UDT Socket消息接收方式的轉(zhuǎn)變。
UDT Socket至此進(jìn)入CONNECTED狀態(tài)。
Done。
總結(jié)
以上是生活随笔為你收集整理的UDT协议实现分析——连接的建立的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: UDT协议实现分析——bind、list
- 下一篇: 在C代码调用C++代码