UDT 最新源码分析(三) -- UDT Socket 相关函数
UDT 最新源碼分析 -- UDT Socket 相關函數
- UDT socket 建立與使用
- 主要流程
- C/S 模式
- Rendezvous 模式
- UDT epoll
- UDT socket 創建
- UDT socket setsockopt/getsockopt
- UDT socket bind
- UDT::listen
- UDT connect
- UDT accept
- 總結
- C/S 模式--四次握手
- Rendezvous模式--三次握手
UDT socket 建立與使用
主要流程
C/S 模式
- UDT::socket -> UDT::setsockopt -> UDT::connect -> UDT::send -> UDT::close
- UDT::socket -> UDT::setsockopt -> UDT::bind -> UDT::listen -> UDT::accept -> UDT::recv -> UDT::close
Rendezvous 模式
- UDT::setsockopt(usock, 0, UDT_RENDEZVOUS, &rendezvous, sizeof(bool))
UDT epoll
詳細源碼分析在下一篇文章中。
- UDT::epoll_create -> UDT::epoll_add_usock/epoll_add_ssock -> UDT::epoll_wait/epoll_wait2 -> UDT::epoll_release
UDT socket 創建
- UDT::socket -> CUDT::socket -> CUDTUnited::newSocket
UDT socket 的創建過程主要分為以下幾步:
首先代碼分析還是從對外提供的接口調用開始。
UDTSOCKET CUDT::socket(int af, int type, int) {if (!s_UDTUnited.m_bGCStatus)s_UDTUnited.startup(); // 如果GC 線程未啟動,那么首先啟動return s_UDTUnited.newSocket(af, type); //創建一個 UDT socket }接下來是通過 newSocket 創建 UDT socket 的過程。
UDTSOCKET CUDTUnited::newSocket(int af, int type) {CUDTSocket* ns = NULL;try{ns = new CUDTSocket; //新建UDT socketns->m_pUDT = new CUDT; //CUDT 在創建socket時新建if (AF_INET == af){ns->m_pSelfAddr = (sockaddr*)(new sockaddr_in);((sockaddr_in*)(ns->m_pSelfAddr))->sin_port = 0;}else{ns->m_pSelfAddr = (sockaddr*)(new sockaddr_in6); //支持IPv6((sockaddr_in6*)(ns->m_pSelfAddr))->sin6_port = 0;}}catch (...) { ... }CGuard::enterCS(m_IDLock);ns->m_SocketID = -- m_SocketID; //在初始化的隨機數值上進行遞減,作為 UDT socket IDCGuard::leaveCS(m_IDLock);ns->m_Status = INIT; //設置為 INIT 狀態ns->m_ListenSocket = 0;ns->m_pUDT->m_SocketID = ns->m_SocketID; //將剛剛獲得 UDT socket ID 注冊到 CUDT中ns->m_pUDT->m_iSockType = (SOCK_STREAM == type) ? UDT_STREAM : UDT_DGRAM; ns->m_pUDT->m_iIPversion = ns->m_iIPversion = af;ns->m_pUDT->m_pCache = m_pCache;// protect the m_Sockets structure.CGuard::enterCS(m_ControlLock);try{m_Sockets[ns->m_SocketID] = ns; //將 UDT socket加入 全局m_Sockets map}catch (...){//failure and rollback...}CGuard::leaveCS(m_ControlLock);return ns->m_SocketID; }m_SocketID 的初始化,在CUDTUnited構造函數中被初始化為一個隨機數。構造函數在系統初始化 startup 時被調用。
CUDTUnited::CUDTUnited(): m_SocketID(0), {// Socket ID MUST start from a random valuesrand((unsigned int)CTimer::getTime());m_SocketID = 1 + (int)((1 << 30) * (double(rand()) / RAND_MAX)); }UDT socket setsockopt/getsockopt
UDT Socket 參數設置。
- CUDT::setsockopt -> CUDT::setOpt
- CUDT::getsockopt -> CUDT::getOpt
UDT 可配置的參數中包括一些系統內部自定義參數,這些參數的定義如下所示:
enum UDTOpt {UDT_MSS, // the Maximum Transfer UnitUDT_SNDSYN, // if sending is blockingUDT_RCVSYN, // if receiving is blockingUDT_CC, // custom congestion control algorithmUDT_FC, // Flight flag size (window size)UDT_SNDBUF, // maximum buffer in sending queueUDT_RCVBUF, // UDT receiving buffer sizeUDT_LINGER, // waiting for unsent data when closingUDP_SNDBUF, // UDP sending buffer sizeUDP_RCVBUF, // UDP receiving buffer sizeUDT_MAXMSG, // maximum datagram message sizeUDT_MSGTTL, // time-to-live of a datagram messageUDT_RENDEZVOUS, // rendezvous connection modeUDT_SNDTIMEO, // send() timeoutUDT_RCVTIMEO, // recv() timeoutUDT_REUSEADDR, // reuse an existing port or create a new oneUDT_MAXBW, // maximum bandwidth (bytes per second) that the connection can useUDT_STATE, // current socket state, see UDTSTATUS, read onlyUDT_EVENT, // current avalable events associated with the socketUDT_SNDDATA, // size of data in the sending bufferUDT_RCVDATA // size of data available for recv };UDT socket bind
- CUDT::bind -> CUDTUnited::bind
- 不同形式
- int CUDTUnited::bind(UDTSOCKET u, UDPSOCKET udpsock)
- int CUDTUnited::bind(const UDTSOCKET u, const sockaddr* name, int namelen)
UDT bind 過程涉及到的模塊較多,總的來說,就是將創建的 UDT socket 的信息注冊到一個復用器上,如果復用器不存在則創建。每個復用器保證用于一個端口,每個復用器有一個 channel, 用于 udp socket 的創建,端口綁定等,修改 UDT socket 狀態, 從 INIT 遷移到 OPENED。
也就是說,UDT socket通過UDT bind 與復用器 CMultiplexer 關聯在一起,channel 作為 udp socket 的真正執行者進行運行,通過發送接收的兩個工作線程完成數據的收發。發送接收的兩個隊列屬于復用器,但是通過復用器ID使得 UDT socket 發送數據時直接與 channel 打交道,不再需要查找復用器。
int CUDTUnited::bind(const UDTSOCKET u, ...) {CUDTSocket* s = locate(u);CGuard cg(s->m_ControlLock);// cannot bind a socket more than onceif (INIT != s->m_Status)throw CUDTException(5, 0, 0);s->m_pUDT->open(); //m_pUDT中一堆參數初始化updateMux(s, name); //更新復用器s->m_Status = OPENED; //更新UDT socket 狀態為 OPENED// copy address information of local nodes->m_pUDT->m_pSndQueue->m_pChannel->getSockAddr(s->m_pSelfAddr); return 0; }每個端口可能被多個UDT socket復用,所以綁定端口實際上是注冊到端口唯一的復用器上
- void CUDTUnited::updateMux(CUDTSocket* s, const CUDTSocket* ls)
- void CUDTUnited::updateMux(CUDTSocket* s, const sockaddr* addr, const UDPSOCKET* udpsock)
創建 channel 以后,新建發送接收隊列,同時還需要分別調用 init 函數,該方法的主要作用除了對一部分參數初始化外,就是創建工作線程 worker 。
以發送隊列舉例,接收隊列以及具體的使用過程后續文章介紹:
void CSndQueue::init(CChannel* c, CTimer* t) {m_pChannel = c;m_pTimer = t;m_pSndUList = new CSndUList;m_pSndUList->m_pWindowLock = &m_WindowLock;m_pSndUList->m_pWindowCond = &m_WindowCond;m_pSndUList->m_pTimer = m_pTimer;if (0 != pthread_create(&m_WorkerThread, NULL, CSndQueue::worker, this)){m_WorkerThread = 0;throw CUDTException(3, 1);} }對于發送隊列的工作線程:
void* CSndQueue::worker(void* param) {CSndQueue* self = (CSndQueue*)param;while (!self->m_bClosing) //只要隊列處于正常狀態,就無限循環{uint64_t ts = self->m_pSndUList->getNextProcTime();if (ts > 0){// wait until next processing time of the first socket on the listuint64_t currtime;CTimer::rdtsc(currtime);if (currtime < ts) //未到時間,繼續等待self->m_pTimer->sleepto(ts);// it is time to send the next pktsockaddr* addr;CPacket pkt;if (self->m_pSndUList->pop(addr, pkt) < 0)continue;self->m_pChannel->sendto(addr, pkt);}else{// wait here if there is no sockets with data to be sent pthread_mutex_lock(&self->m_WindowLock);if (!self->m_bClosing && (self->m_pSndUList->m_iLastEntry < 0))pthread_cond_wait(&self->m_WindowCond, &self->m_WindowLock);//條件等待pthread_mutex_unlock(&self->m_WindowLock);}}return NULL; }UDT::listen
- CUDT::listen -> CUDTUnited::listen
UDT listen 在 UDT socket 處于OPENED 狀態時,開始端口監聽,從 UDT bind 可知,此時已經 bind 成功。一個端口上只能有一個 listening socket。這里的 listen 用于 C/S 模式,不支持匯合模式。對于已經處于監聽狀態的 UDT socket, 不會多次監聽。
執行 UDT listen 成功后,m_bListening 修改為 true, UDT socket 狀態 m_Status 變成 LISTENING。在 UDT socket 中新建兩個集合 m_pQueuedSockets 和 m_pAcceptSockets,分別存放接收但還未來得及處理接受的連接請求 ,或者已經接受的連接請求。使用集合,也是借用集合元素唯一的特性。
實際上,執行 UDT listen 是設置監聽到復用器中的接收隊列 m_pRcvQueue。在隊列的工作線程中,將會根據到來的包的類型進行對應的響應,并發送。
int CUDTUnited::listen(const UDTSOCKET u, int backlog) {CUDTSocket* s = locate(u);CGuard cg(s->m_ControlLock);// do nothing if the socket is already listeningif (LISTENING == s->m_Status)return 0;// a socket can listen only if is in OPENED statusif (OPENED != s->m_Status)throw CUDTException(5, 5, 0);// listen is not supported in rendezvous connection setupif (s->m_pUDT->m_bRendezvous)throw CUDTException(5, 7, 0);if (backlog <= 0)throw CUDTException(5, 3, 0);s->m_uiBackLog = backlog;try{ //新建 接收到但未接受的socket集合與以及接受的集合,使用 set保證每個socket唯一s->m_pQueuedSockets = new set<UDTSOCKET>; s->m_pAcceptSockets = new set<UDTSOCKET>;}catch (...) { ... }s->m_pUDT->listen();s->m_Status = LISTENING;return 0; }通過接收隊列設置UDT 實例中的Listener,實際上是設置到復用器中
void CUDT::listen() {CGuard cg(m_ConnectionLock);if (!m_bOpened)throw CUDTException(5, 0, 0);if (m_bConnecting || m_bConnected)throw CUDTException(5, 2, 0);// listen can be called more than onceif (m_bListening)return;// if there is already another socket listening on the same portif (m_pRcvQueue->setListener(this) < 0) // 為CRcvQueue 中 設置 = thisthrow CUDTException(5, 11, 0);m_bListening = true; //修改監聽狀態 }通過 UDT listen 設置,為復用器設置 Listener,當接收到數據時,將數據分發到對應的 UDT 實例。同時修改UDT 的當前狀態。
在 setListener 以后, 非空,在CRcvQueue中線程函數 worker 循環中,會調用recvfrom 接收到連接請求,檢查m_Packet.m_iID, 決定是否調用 connect。
void* CRcvQueue::worker(void* param) {CRcvQueue* self = (CRcvQueue*)param;sockaddr* addr = ...CUDT* u = NULL;int32_t id;while (!self->m_bClosing){unit->m_Packet.setLength(self->m_iPayloadSize);// reading next incoming packet, recvfrom returns -1 is nothing has been receivedif (self->m_pChannel->recvfrom(addr, unit->m_Packet) < 0)goto TIMER_CHECK;id = unit->m_Packet.m_iID;// ID 0 is for connection request, which should be passed to the listening socket or rendezvous socketsif (0 == id){if (NULL != self->m_pListener)self->m_pListener->listen(addr, unit->m_Packet);else if (NULL != (u = self->m_pRendezvousQueue->retrieve(addr, id))){// asynchronous connect: call connect here// otherwise wait for the UDT socket to retrieve this packetif (!u->m_bSynRecving)u->connect(unit->m_Packet);elseself->storePkt(id, unit->m_Packet.clone());}}} }對于到達的一個連接請求,如果非空,就可以調用 CUDT 中的私有 listen 方法。對到達的建立連接的包進行解析,生成 coockie字符串,如果時正常的連接請求,則調用發送隊列 sendto 發送包。如果是響應消息,且通過cookie 驗證,則建立新連接。
int CUDT::listen(sockaddr* addr, CPacket& packet) {CHandShake hs;hs.deserialize(packet.m_pcData, packet.getLength());// SYN cookiechar clienthost[NI_MAXHOST];char clientport[NI_MAXSERV];getnameinfo(addr, (AF_INET == m_iVersion) ? sizeof(sockaddr_in) : sizeof(sockaddr_in6), clienthost, sizeof(clienthost), clientport, sizeof(clientport), NI_NUMERICHOST|NI_NUMERICSERV);int64_t timestamp = (CTimer::getTime() - m_StartTime) / 60000000; // secret changes every one minutestringstream cookiestr;cookiestr << clienthost << ":" << clientport << ":" << timestamp;unsigned char cookie[16];CMD5::compute(cookiestr.str().c_str(), cookie);// connection request type: 1: regular connection request, 0: rendezvous connection request, -1/-2: responseif (1 == hs.m_iReqType) {hs.m_iCookie = *(int*)cookie;packet.m_iID = hs.m_iID;int size = packet.getLength();hs.serialize(packet.m_pcData, size);m_pSndQueue->sendto(addr, packet);return 0;}else{if (hs.m_iCookie != *(int*)cookie){timestamp --;cookiestr << clienthost << ":" << clientport << ":" << timestamp;CMD5::compute(cookiestr.str().c_str(), cookie);if (hs.m_iCookie != *(int*)cookie)return -1;}}int32_t id = hs.m_iID;// When a peer side connects in...if ((1 == packet.getFlag()) && (0 == packet.getType())){ // 控制包,且當前為 Connection Handshakeif ((hs.m_iVersion != m_iVersion) || (hs.m_iType != m_iSockType)){// mismatch, reject the requesths.m_iReqType = 1002;int size = CHandShake::m_iContentSize;hs.serialize(packet.m_pcData, size);packet.m_iID = id;m_pSndQueue->sendto(addr, packet);}else{ int result = s_UDTUnited.newConnection(m_SocketID, addr, &hs);if (result == -1)hs.m_iReqType = 1002;// send back a response if connection failed or connection already existed// new connection response should be sent in connect()if (result != 1){int size = CHandShake::m_iContentSize;hs.serialize(packet.m_pcData, size);packet.m_iID = id;m_pSndQueue->sendto(addr, packet);}else{// a new connection has been created, enable epoll for write s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_OUT, true);}}}return hs.m_iReqType; }在上面的代碼中,newConnection 主要作用是建立一個新的連接,實質上是將連接的對端信息加入到 UDT 中存放對端連接socket 記錄的 m_PeerRec map,并在 m_Sockets 加入新建立的 UDT socket。
首先檢查 這個連接是否已經建立,如果已經建立,則返回已經存在 UDT socket 信息;如果當前處于 BROKEN 狀態,會修改狀態到 CLOSED, 并進行一些清理工作。其余的清理流程在 UDT 關閉過程中處理。
如果是一個新的連接,新建 UDT socket, 初始化部分參數,包括 m_pSelfAddr, m_SocketID, m_ListenSocket,m_PeerID, m_iISN等,然后綁定新的地址到監聽socket。修改狀態為 CONNECTED。修改 m_PeerRec 與 m_Sockets, 插入socket 進入 m_pQueuedSockets, 更新本地節點信息, 更新事件與定時器。等待 accept 事件到來。
int CUDTUnited::newConnection(const UDTSOCKET listen, const sockaddr* peer, CHandShake* hs) {CUDTSocket* ns = NULL;CUDTSocket* ls = locate(listen); //在 m_Sockets中查找本地 UDT socket。if (NULL == ls)return -1;// if this connection has already been processedif (NULL != (ns = locate(peer, hs->m_iID, hs->m_iISN))) //在 m_PeerRec 與 m_Sockets中查找。{if (ns->m_pUDT->m_bBroken){// last connection from the "peer" address has been broken...}else{// connection already exist, this is a repeated connection request// respond with existing HS information...return 0;//except for this situation a new connection should be started}}// exceeding backlog, refuse the connection requestif (ls->m_pQueuedSockets->size() >= ls->m_uiBackLog)return -1;try{ns = new CUDTSocket;ns->m_pUDT = new CUDT(*(ls->m_pUDT));...ns->m_pSelfAddr = ......}catch (...) { ... }CGuard::enterCS(m_IDLock);ns->m_SocketID = -- m_SocketID;CGuard::leaveCS(m_IDLock);ns->m_ListenSocket = listen;ns->m_iIPversion = ls->m_iIPversion;ns->m_pUDT->m_SocketID = ns->m_SocketID;ns->m_PeerID = hs->m_iID;ns->m_iISN = hs->m_iISN;int error = 0;try{// bind to the same addr of listening socketns->m_pUDT->open();updateMux(ns, ls);ns->m_pUDT->connect(peer, hs);}catch (...){error = 1;goto ERR_ROLLBACK;}ns->m_Status = CONNECTED;// copy address information of local nodens->m_pUDT->m_pSndQueue->m_pChannel->getSockAddr(ns->m_pSelfAddr);CIPAddress::pton(ns->m_pSelfAddr, ns->m_pUDT->m_piSelfIP, ns->m_iIPversion);// protect the m_Sockets structure.CGuard::enterCS(m_ControlLock);try{m_Sockets[ns->m_SocketID] = ns;m_PeerRec[(ns->m_PeerID << 30) + ns->m_iISN].insert(ns->m_SocketID);}catch (...){error = 2;}CGuard::leaveCS(m_ControlLock);CGuard::enterCS(ls->m_AcceptLock);try{ls->m_pQueuedSockets->insert(ns->m_SocketID);}catch (...){error = 3;}CGuard::leaveCS(ls->m_AcceptLock);// acknowledge users waiting for new connections on the listening socketm_EPoll.update_events(listen, ls->m_pUDT->m_sPollID, UDT_EPOLL_IN, true);CTimer::triggerEvent();ERR_ROLLBACK:if (error > 0){ns->m_pUDT->close();ns->m_Status = CLOSED;ns->m_TimeStamp = CTimer::getTime();return -1;}// wake up a waiting accept() call#ifndef WIN32pthread_mutex_lock(&(ls->m_AcceptLock));pthread_cond_signal(&(ls->m_AcceptCond));pthread_mutex_unlock(&(ls->m_AcceptLock));#elseSetEvent(ls->m_AcceptCond);#endifreturn 1; }UDT connect
- CUDT::connect( api.cpp, CUDT::public method) -> CUDTUnited::connect -> CUDT::connect( core.cpp, CUDT::private method)
如果UDT socket 能夠 connect,首先應該處于 INIT 或者 OPENED 狀態。如果處于 INIT狀態,表明為新創建的UDT Socket,需要初始化 m_pUDT 內參數并注冊到復用器,修改狀態為 OPENED。如果處于OPENED 狀態,可能已經被 bind 過,則可以進入 CONNECTING 狀態,并調用 m_pUDT->connect。記錄對端地址 m_pPeerAddr到該 UDT socket內部結構。
int CUDTUnited::connect(const UDTSOCKET u, const sockaddr* name, int namelen) {CUDTSocket* s = locate(u);CGuard cg(s->m_ControlLock);// a socket can "connect" only if it is in INIT or OPENED statusif (INIT == s->m_Status){if (!s->m_pUDT->m_bRendezvous){s->m_pUDT->open();updateMux(s);s->m_Status = OPENED;}elsethrow CUDTException(5, 8, 0);}else if (OPENED != s->m_Status)throw CUDTException(5, 2, 0);// connect_complete() may be called before connect() returns.// So we need to update the status before connect() is called,// otherwise the status may be overwritten with wrong value (CONNECTED vs. CONNECTING).s->m_Status = CONNECTING;try{s->m_pUDT->connect(name);}catch (CUDTException e){s->m_Status = OPENED;throw e;}// record peer addressdelete s->m_pPeerAddr;if (AF_INET == s->m_iIPversion){s->m_pPeerAddr = (sockaddr*)(new sockaddr_in);memcpy(s->m_pPeerAddr, name, sizeof(sockaddr_in));}else{s->m_pPeerAddr = (sockaddr*)(new sockaddr_in6);memcpy(s->m_pPeerAddr, name, sizeof(sockaddr_in6));}return 0; }在 CUDT 中, 在private 方法中,connect 有三種形式:
// Functionality: // Connect to a UDT entity listening at address "peer". // Parameters: // 0) [in] peer: The address of the listening UDT entity. // Returned value: // None. void CUDT::connect(const sockaddr* serv_addr) // Functionality: // Process the response handshake packet. // Parameters: // 0) [in] pkt: handshake packet. // Returned value: // Return 0 if connected, positive value if connection is in progress, otherwise error code. int CUDT::connect(const CPacket& response) throw ()// Functionality: // Connect to a UDT entity listening at address "peer", which has sent "hs" request. // Parameters: // 0) [in] peer: The address of the listening UDT entity. // 1) [in/out] hs: The handshake information sent by the peer side (in), negotiated value (out). // Returned value: // None. void CUDT::connect(const sockaddr* peer, CHandShake* hs)在 connect 中調用的 connect 參數為 sockaddr,即第一種形式,這也是從外部接口 UDT::connect 調用后進入的函數:
void CUDT::connect(const sockaddr* serv_addr) {CGuard cg(m_ConnectionLock);if (!m_bOpened) // UDT socket 處于 OPENED 狀態throw CUDTException(5, 0, 0);if (m_bListening) //不能同時 listen 與 connectthrow CUDTException(5, 2, 0);if (m_bConnecting || m_bConnected) //以前沒有進行 connect 過throw CUDTException(5, 2, 0);m_bConnecting = true; //修改狀態,防止被多次 connect// record peer/server addressdelete m_pPeerAddr;m_pPeerAddr = (AF_INET == m_iIPversion) ? (sockaddr*)new sockaddr_in : (sockaddr*)new sockaddr_in6;memcpy(m_pPeerAddr, serv_addr, (AF_INET == m_iIPversion) ? sizeof(sockaddr_in) : sizeof(sockaddr_in6));// register this socket in the rendezvous queue// RendezevousQueue is used to temporarily store incoming handshake, non-rendezvous connections also require this functionuint64_t ttl = 3000000; //這也是關閉時需要額外等3s的原因if (m_bRendezvous)ttl *= 10;ttl += CTimer::getTime();//將 UDT socket 插入一個鏈表 m_lRendezvousID,臨時存儲。不管是否匯合模式,都會保存。此處可能導致誤解。m_pRcvQueue->registerConnector(m_SocketID, this, m_iIPversion, serv_addr, ttl); // This is my current configurationsm_ConnReq.m_iVersion = m_iVersion;m_ConnReq.m_iType = m_iSockType;m_ConnReq.m_iMSS = m_iMSS;m_ConnReq.m_iFlightFlagSize = (m_iRcvBufSize < m_iFlightFlagSize)? m_iRcvBufSize : m_iFlightFlagSize;m_ConnReq.m_iReqType = (!m_bRendezvous) ? 1 : 0;m_ConnReq.m_iID = m_SocketID;CIPAddress::ntop(serv_addr, m_ConnReq.m_piPeerIP, m_iIPversion);// Random Initial Sequence Numbersrand((unsigned int)CTimer::getTime());m_iISN = m_ConnReq.m_iISN = (int32_t)(CSeqNo::m_iMaxSeqNo * (double(rand()) / RAND_MAX));m_iLastDecSeq = m_iISN - 1;m_iSndLastAck = m_iISN;m_iSndLastDataAck = m_iISN;m_iSndCurrSeqNo = m_iISN - 1;m_iSndLastAck2 = m_iISN;m_ullSndLastAck2Time = CTimer::getTime();// Inform the server my configurations.CPacket request;char* reqdata = new char [m_iPayloadSize];request.pack(0, NULL, reqdata, m_iPayloadSize); //建包// ID = 0, connection requestrequest.m_iID = 0;int hs_size = m_iPayloadSize;m_ConnReq.serialize(reqdata, hs_size); //寫入請求request.setLength(hs_size);m_pSndQueue->sendto(serv_addr, request); // 發送請求m_llLastReqTime = CTimer::getTime(); //更新定時器// asynchronous connect, return immediatelyif (!m_bSynRecving){delete [] reqdata;return;}// Wait for the negotiated configurations from the peer side.CPacket response;char* resdata = new char [m_iPayloadSize];response.pack(0, NULL, resdata, m_iPayloadSize);CUDTException e(0, 0);while (!m_bClosing) // 等待 connect 返回,最多等待3s, 如果沒有響應,會重復發送請求{// avoid sending too many requests, at most 1 request per 250msif (CTimer::getTime() - m_llLastReqTime > 250000){m_ConnReq.serialize(reqdata, hs_size);request.setLength(hs_size);if (m_bRendezvous)request.m_iID = m_ConnRes.m_iID;m_pSndQueue->sendto(serv_addr, request);m_llLastReqTime = CTimer::getTime();}response.setLength(m_iPayloadSize);if (m_pRcvQueue->recvfrom(m_SocketID, response) > 0){if (connect(response) <= 0)break;// new request/response should be sent out immediately on receving a responsem_llLastReqTime = 0;}if (CTimer::getTime() > ttl){// timeoute = CUDTException(1, 1, 0);break;}}delete [] reqdata;delete [] resdata;if (e.getErrorCode() == 0){if (m_bClosing) // if the socket is closed before connection...e = CUDTException(1);else if (1002 == m_ConnRes.m_iReqType) // connection request rejectede = CUDTException(1, 2, 0);else if ((!m_bRendezvous) && (m_iISN != m_ConnRes.m_iISN)) // secuity checke = CUDTException(1, 4, 0);}if (e.getErrorCode() != 0)throw e; }發出 connect 連接請求以后,如果屬于同步方式,將等待返回,超時時間設置為3s,并會間隔250ms 不斷發送請求,直至收到響應。接收到響應以后, connect(response), 即為 第二種 connect 方法。
建立連接的過程可以參考 TCP 的半連接的思想。這第二次的connect 實際上就是 第二個半連接的建立過程,也是最后一個協商過程。對于非匯合模式,從 m_lRendezvousID 中移除,重新配置所有的連接參數, 為UDT socket 建立對應的各種數據結構,包括 發送接收buffer,丟失鏈表,窗口等,這些是數據傳輸過程中需要使用的內部結構,服務于 UDT 的核心傳輸算法,包括擁塞避免,重傳等。所以也會初始化擁塞控制相關參數,最后,設置當前狀態 為已連接狀態。通過 connect_complete 與 update_events 通知管理模塊與epool 狀態更新。
int CUDT::connect(const CPacket& response) throw () {// this is the 2nd half of a connection request. If the connection is setup successfully this returns 0.// returning -1 means there is an error.// returning 1 or 2 means the connection is in process and needs more handshakeif (!m_bConnecting)return -1;if (m_bRendezvous && ((0 == response.getFlag()) || (1 == response.getType())) && (0 != m_ConnRes.m_iType)){//a data packet or a keep-alive packet comes, which means the peer side is already connected// in this situation, the previously recorded response will be usedgoto POST_CONNECT;}if ((1 != response.getFlag()) || (0 != response.getType()))return -1;m_ConnRes.deserialize(response.m_pcData, response.getLength());if (m_bRendezvous){// regular connect should NOT communicate with rendezvous connect// rendezvous connect require 3-way handshakeif (1 == m_ConnRes.m_iReqType)return -1;if ((0 == m_ConnReq.m_iReqType) || (0 == m_ConnRes.m_iReqType)){m_ConnReq.m_iReqType = -1;// the request time must be updated so that the next handshake can be sent out immediately.m_llLastReqTime = 0;return 1;}}else{// set cookieif (1 == m_ConnRes.m_iReqType){m_ConnReq.m_iReqType = -1;m_ConnReq.m_iCookie = m_ConnRes.m_iCookie;m_llLastReqTime = 0;return 1;}}POST_CONNECT:// Remove from rendezvous queuem_pRcvQueue->removeConnector(m_SocketID);// Re-configure according to the negotiated values.m_iMSS = m_ConnRes.m_iMSS;m_iFlowWindowSize = m_ConnRes.m_iFlightFlagSize;m_iPktSize = m_iMSS - 28;m_iPayloadSize = m_iPktSize - CPacket::m_iPktHdrSize;m_iPeerISN = m_ConnRes.m_iISN;m_iRcvLastAck = m_ConnRes.m_iISN;m_iRcvLastAckAck = m_ConnRes.m_iISN;m_iRcvCurrSeqNo = m_ConnRes.m_iISN - 1;m_PeerID = m_ConnRes.m_iID;memcpy(m_piSelfIP, m_ConnRes.m_piPeerIP, 16);// Prepare all data structurestry{m_pSndBuffer = new CSndBuffer(32, m_iPayloadSize);m_pRcvBuffer = new CRcvBuffer(&(m_pRcvQueue->m_UnitQueue), m_iRcvBufSize);// after introducing lite ACK, the sndlosslist may not be cleared in time, so it requires twice space.m_pSndLossList = new CSndLossList(m_iFlowWindowSize * 2);m_pRcvLossList = new CRcvLossList(m_iFlightFlagSize);m_pACKWindow = new CACKWindow(1024);m_pRcvTimeWindow = new CPktTimeWindow(16, 64);m_pSndTimeWindow = new CPktTimeWindow();}catch (...){throw CUDTException(3, 2, 0);}CInfoBlock ib;ib.m_iIPversion = m_iIPversion;CInfoBlock::convert(m_pPeerAddr, m_iIPversion, ib.m_piIP);if (m_pCache->lookup(&ib) >= 0){m_iRTT = ib.m_iRTT;m_iBandwidth = ib.m_iBandwidth;}m_pCC = m_pCCFactory->create();m_pCC->m_UDT = m_SocketID;m_pCC->setMSS(m_iMSS);m_pCC->setMaxCWndSize(m_iFlowWindowSize);m_pCC->setSndCurrSeqNo(m_iSndCurrSeqNo);m_pCC->setRcvRate(m_iDeliveryRate);m_pCC->setRTT(m_iRTT);m_pCC->setBandwidth(m_iBandwidth);m_pCC->init();m_ullInterval = (uint64_t)(m_pCC->m_dPktSndPeriod * m_ullCPUFrequency);m_dCongestionWindow = m_pCC->m_dCWndSize;// And, I am connected too.m_bConnecting = false;m_bConnected = true;// register this socket for receiving data packetsm_pRNode->m_bOnList = true;m_pRcvQueue->setNewEntry(this);// acknowledge the management module.s_UDTUnited.connect_complete(m_SocketID); // 更新m_pSndQueue->m_pChannel本地節點信息,設置狀態為 CONNECTED// acknowledde any waiting epolls to writes_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_OUT, true);return 0; }UDT accept
- UDT::accept -> CUDT::accept -> CUDTUnited::accept
UDT accept 與 TCP socket 中 的accept一樣,在socket bind 以后就可以使用,等待其他連接到來。所以當前的 UDT Listener 的狀態為 LISTENING。 僅在非匯合模式下使用。
主要的過程是一個while 循環,等待accept事件。當到來以后,刪除 m_pQueuedSockets中的節點,插入 m_pAcceptSockets。update_events 發給 epool事件更新。存儲對端地址。
UDTSOCKET CUDTUnited::accept(const UDTSOCKET listen, sockaddr* addr, int* addrlen) {CUDTSocket* ls = locate(listen);// the "listen" socket must be in LISTENING statusif (LISTENING != ls->m_Status)throw CUDTException(5, 6, 0);// no "accept" in rendezvous connection setupif (ls->m_pUDT->m_bRendezvous)throw CUDTException(5, 7, 0);UDTSOCKET u = CUDT::INVALID_SOCK;bool accepted = false;// !!only one conection can be set up each time!!#ifndef WIN32while (!accepted) //循環等待連接到來 accepted = true 時退出{pthread_mutex_lock(&(ls->m_AcceptLock));if ((LISTENING != ls->m_Status) || ls->m_pUDT->m_bBroken){// This socket has been closed.accepted = true;}else if (ls->m_pQueuedSockets->size() > 0){ //更新 m_pAcceptSockets 和 m_pQueuedSocketsu = *(ls->m_pQueuedSockets->begin()); ls->m_pAcceptSockets->insert(ls->m_pAcceptSockets->end(), u);ls->m_pQueuedSockets->erase(ls->m_pQueuedSockets->begin());accepted = true;}else if (!ls->m_pUDT->m_bSynRecving){accepted = true;}if (!accepted && (LISTENING == ls->m_Status))pthread_cond_wait(&(ls->m_AcceptCond), &(ls->m_AcceptLock));if (ls->m_pQueuedSockets->empty())m_EPoll.update_events(listen, ls->m_pUDT->m_sPollID, UDT_EPOLL_IN, false);pthread_mutex_unlock(&(ls->m_AcceptLock));}#elsewhile (!accepted){WaitForSingleObject(ls->m_AcceptLock, INFINITE);if (ls->m_pQueuedSockets->size() > 0){u = *(ls->m_pQueuedSockets->begin());ls->m_pAcceptSockets->insert(ls->m_pAcceptSockets->end(), u);ls->m_pQueuedSockets->erase(ls->m_pQueuedSockets->begin());accepted = true;}else if (!ls->m_pUDT->m_bSynRecving)accepted = true;ReleaseMutex(ls->m_AcceptLock);if (!accepted & (LISTENING == ls->m_Status))WaitForSingleObject(ls->m_AcceptCond, INFINITE);if ((LISTENING != ls->m_Status) || ls->m_pUDT->m_bBroken){// Send signal to other threads that are waiting to accept.SetEvent(ls->m_AcceptCond);accepted = true;}if (ls->m_pQueuedSockets->empty())m_EPoll.update_events(listen, ls->m_pUDT->m_sPollID, UDT_EPOLL_IN, false);}#endifif (u == CUDT::INVALID_SOCK){// non-blocking receiving, no connection availableif (!ls->m_pUDT->m_bSynRecving)throw CUDTException(6, 2, 0);// listening socket is closedthrow CUDTException(5, 6, 0);}// 存儲對端的地址if ((addr != NULL) && (addrlen != NULL)){if (AF_INET == locate(u)->m_iIPversion)*addrlen = sizeof(sockaddr_in);else*addrlen = sizeof(sockaddr_in6);// copy address information of peer nodememcpy(addr, locate(u)->m_pPeerAddr, *addrlen);}return u; }總結
UDT支持兩種連接模式:C/S 模式和匯合模式。UDT client 發送一個握手消息(type為 0 的控制報文)給 server 或者 peer。消息攜帶信息格式見文章 UDT最新協議分析 。
C/S 模式–四次握手
如果一個UDT socket 作為server,會建立一個UDT實體, 并作為 Listener 監聽綁定的端口,當有新的連接請求到來時,就會新創建一個 UDT socket,并初始化相關信息,并將新的 UDT socket 相關的信息寫入到 Listener。這就是一個連接的建立過程,和TCP的連接過程比較相似。
- UDT server 將握手報文中的 packet size 和 maximum window size 信息提取出來,并同 server 端自己的 packet size 和 maximum window size信息相比較,將較小的 packet size 和 maximum window size 信息賦值給自己。
- UDT server 把包大小與最大窗口等結果發送給client端,并攜帶上 server 的版本號和初始序列號。為防止丟包,如果后續還接收到同一對端其他握手消息時,仍需要繼續發送響應。
- UDT server 準備接收發送數據。
- UDT client 收到 server 發送的握手包,開始發送接收數據,如果還有其他握手消息,不再回應。
Rendezvous模式–三次握手
匯合模式下,兩端均為客戶端,需要兩端同時調用udt::connect, 主要用于NAT穿透的情況。
- 如果連接類型為0,那么響應報文中會被設置成-1。
- 如果連接類型為-1,那么響應報文中會被設置成-2。
- 如果連接類型為-2,那么將不會有任何反饋信息。
總結
以上是生活随笔為你收集整理的UDT 最新源码分析(三) -- UDT Socket 相关函数的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: netty系列之:请netty再爱UDT
- 下一篇: LHS与RHS查询