当前位置: 首页 > 工具软件 > UDT > 使用案例 >

UDT 最新源码分析(三) -- UDT Socket 相关函数

花阳辉
2023-12-01

UDT socket 建立与使用

主要流程

C/S 模式

  • UDT::socket -> UDT::setsockopt -> UDT::connect -> UDT::send -> UDT::close
  • UDT::socket -> UDT::setsockopt -> UDT::bind -> UDT::listen -> UDT::accept -> UDT::recv -> UDT::close

Rendezvous 模式

  • UDT::setsockopt(usock, 0, UDT_RENDEZVOUS, &rendezvous, sizeof(bool))

UDT epoll

详细源码分析在下一篇文章中。

  • UDT::epoll_create -> UDT::epoll_add_usock/epoll_add_ssock -> UDT::epoll_wait/epoll_wait2 -> UDT::epoll_release

UDT socket 创建

  • UDT::socket -> CUDT::socket -> CUDTUnited::newSocket

UDT socket 的创建过程主要分为以下几步:

  1. 如果GC 线程未启动,则首先启动;
  2. 新建 CUDTSocket,初始化新建内在变量 m_pUDT,以及地址信息 m_pSelfAddr,UDT socket 的标识 m_SocketID,UDT socket的状态 m_Status 设置为 INIT, m_ListenSocket 初始设置为0;
  3. 将 UDT socket中信息注册到m_pUDT, 包括 m_SocketID,m_iSockType,m_iIPversion,m_pCache;
  4. 将 UDT socket加入 全局m_Sockets map;
  5. 返回标识 m_SocketID。

首先代码分析还是从对外提供的接口调用开始。

UDTSOCKET CUDT::socket(int af, int type, int)
{
   if (!s_UDTUnited.m_bGCStatus)
      s_UDTUnited.startup(); // 如果GC 线程未启动,那么首先启动

   return s_UDTUnited.newSocket(af, type); //创建一个 UDT socket
}

接下来是通过 newSocket 创建 UDT socket 的过程。

UDTSOCKET CUDTUnited::newSocket(int af, int type)
{
   CUDTSocket* ns = NULL;

   try
   {
      ns = new CUDTSocket;   //新建UDT socket
      ns->m_pUDT = new CUDT; //CUDT 在创建socket时新建
      if (AF_INET == af)
      {
         ns->m_pSelfAddr = (sockaddr*)(new sockaddr_in);
         ((sockaddr_in*)(ns->m_pSelfAddr))->sin_port = 0;
      }
      else
      {
         ns->m_pSelfAddr = (sockaddr*)(new sockaddr_in6); //支持IPv6
         ((sockaddr_in6*)(ns->m_pSelfAddr))->sin6_port = 0;
      }
   }
   catch (...) { ... }

   CGuard::enterCS(m_IDLock);
   ns->m_SocketID = -- m_SocketID; //在初始化的随机数值上进行递减,作为 UDT socket ID
   CGuard::leaveCS(m_IDLock);

   ns->m_Status = INIT;  //设置为 INIT 状态
   ns->m_ListenSocket = 0;
   ns->m_pUDT->m_SocketID = ns->m_SocketID; //将刚刚获得 UDT socket ID 注册到 CUDT中
   ns->m_pUDT->m_iSockType = (SOCK_STREAM == type) ? UDT_STREAM : UDT_DGRAM; 
   ns->m_pUDT->m_iIPversion = ns->m_iIPversion = af;
   ns->m_pUDT->m_pCache = m_pCache;

   // protect the m_Sockets structure.
   CGuard::enterCS(m_ControlLock);
   try
   {
      m_Sockets[ns->m_SocketID] = ns; //将 UDT socket加入 全局m_Sockets map
   }
   catch (...)
   {
      //failure and rollback
      ...
   }
   CGuard::leaveCS(m_ControlLock);

   return ns->m_SocketID;
}

m_SocketID 的初始化,在CUDTUnited构造函数中被初始化为一个随机数。构造函数在系统初始化 startup 时被调用。

CUDTUnited::CUDTUnited():
m_SocketID(0),
{
   // Socket ID MUST start from a random value
   srand((unsigned int)CTimer::getTime());
   m_SocketID = 1 + (int)((1 << 30) * (double(rand()) / RAND_MAX));
}

UDT socket setsockopt/getsockopt

UDT Socket 参数设置。

  • CUDT::setsockopt -> CUDT::setOpt
  • CUDT::getsockopt -> CUDT::getOpt

UDT 可配置的参数中包括一些系统内部自定义参数,这些参数的定义如下所示:

enum UDTOpt
{
   UDT_MSS,             // the Maximum Transfer Unit
   UDT_SNDSYN,          // if sending is blocking
   UDT_RCVSYN,          // if receiving is blocking
   UDT_CC,              // custom congestion control algorithm
   UDT_FC,              // Flight flag size (window size)
   UDT_SNDBUF,          // maximum buffer in sending queue
   UDT_RCVBUF,          // UDT receiving buffer size
   UDT_LINGER,          // waiting for unsent data when closing
   UDP_SNDBUF,          // UDP sending buffer size
   UDP_RCVBUF,          // UDP receiving buffer size
   UDT_MAXMSG,          // maximum datagram message size
   UDT_MSGTTL,          // time-to-live of a datagram message
   UDT_RENDEZVOUS,      // rendezvous connection mode
   UDT_SNDTIMEO,        // send() timeout
   UDT_RCVTIMEO,        // recv() timeout
   UDT_REUSEADDR,       // reuse an existing port or create a new one
   UDT_MAXBW,           // maximum bandwidth (bytes per second) that the connection can use
   UDT_STATE,           // current socket state, see UDTSTATUS, read only
   UDT_EVENT,           // current avalable events associated with the socket
   UDT_SNDDATA,         // size of data in the sending buffer
   UDT_RCVDATA          // size of data available for recv
};

UDT socket bind

  • CUDT::bind -> CUDTUnited::bind
  • 不同形式
    • int CUDTUnited::bind(UDTSOCKET u, UDPSOCKET udpsock)
    • int CUDTUnited::bind(const UDTSOCKET u, const sockaddr* name, int namelen)

UDT bind 过程涉及到的模块较多,总的来说,就是将创建的 UDT socket 的信息注册到一个复用器上,如果复用器不存在则创建。每个复用器保证用于一个端口,每个复用器有一个 channel, 用于 udp socket 的创建,端口绑定等,修改 UDT socket 状态, 从 INIT 迁移到 OPENED。

也就是说,UDT socket通过UDT bind 与复用器 CMultiplexer 关联在一起,channel 作为 udp socket 的真正执行者进行运行,通过发送接收的两个工作线程完成数据的收发。发送接收的两个队列属于复用器,但是通过复用器ID使得 UDT socket 发送数据时直接与 channel 打交道,不再需要查找复用器。

int CUDTUnited::bind(const UDTSOCKET u, ...)
{
   CUDTSocket* s = locate(u);
   CGuard cg(s->m_ControlLock);

   // cannot bind a socket more than once
   if (INIT != s->m_Status)
      throw CUDTException(5, 0, 0);

   s->m_pUDT->open();  //m_pUDT中一堆参数初始化
   updateMux(s, name); //更新复用器
   s->m_Status = OPENED; //更新UDT socket 状态为 OPENED

   // copy address information of local node
   s->m_pUDT->m_pSndQueue->m_pChannel->getSockAddr(s->m_pSelfAddr); 

   return 0;
}

每个端口可能被多个UDT socket复用,所以绑定端口实际上是注册到端口唯一的复用器上

  • void CUDTUnited::updateMux(CUDTSocket* s, const CUDTSocket* ls)
  • void CUDTUnited::updateMux(CUDTSocket* s, const sockaddr* addr, const UDPSOCKET* udpsock)
void CUDTUnited::updateMux(CUDTSocket* s, const sockaddr* addr, const UDPSOCKET* udpsock)
{
   CGuard cg(m_ControlLock);

   if ((s->m_pUDT->m_bReuseAddr) && (NULL != addr))
   {
      int port = (AF_INET == s->m_pUDT->m_iIPversion) ? ntohs(((sockaddr_in*)addr)->sin_port) : ntohs(((sockaddr_in6*)addr)->sin6_port);

      // find a reusable address
      for (map<int, CMultiplexer>::iterator i = m_mMultiplexer.begin(); i != m_mMultiplexer.end(); ++ i)
      {
         if ((i->second.m_iIPversion == s->m_pUDT->m_iIPversion) && (i->second.m_iMSS == s->m_pUDT->m_iMSS) && i->second.m_bReusable)
         {
            if (i->second.m_iPort == port) //找到端口对应复用器
            {
               // reuse the existing multiplexer
               ++ i->second.m_iRefCount; //引用计数加一
               s->m_pUDT->m_pSndQueue = i->second.m_pSndQueue; //发送队列
               s->m_pUDT->m_pRcvQueue = i->second.m_pRcvQueue; //接收队列
               s->m_iMuxID = i->second.m_iID; //将复用器 ID 告知 UDT socket
               return;
            }
         }
      }
   }

   // a new multiplexer is needed
   CMultiplexer m;
   m.m_iMSS = s->m_pUDT->m_iMSS;
   m.m_iIPversion = s->m_pUDT->m_iIPversion;
   m.m_iRefCount = 1;
   m.m_bReusable = s->m_pUDT->m_bReuseAddr;
   m.m_iID = s->m_SocketID;
   // 新建传输channel,设置IP 版本,以及发送接收缓冲大小,默认65536
   m.m_pChannel = new CChannel(s->m_pUDT->m_iIPversion);
   m.m_pChannel->setSndBufSize(s->m_pUDT->m_iUDPSndBufSize);
   m.m_pChannel->setRcvBufSize(s->m_pUDT->m_iUDPRcvBufSize);

   try
   {  //创建并绑定真正用于传输的 udp socket,保存于 channel 中
      if (NULL != udpsock)
         m.m_pChannel->open(*udpsock);
      else
         m.m_pChannel->open(addr);
   }
   catch (CUDTException& e)
   {
      m.m_pChannel->close();
      delete m.m_pChannel;
      throw e;
   }
   //复用器相关参数赋值
   sockaddr* sa = (AF_INET == s->m_pUDT->m_iIPversion) ? (sockaddr*) new sockaddr_in : (sockaddr*) new sockaddr_in6;
   m.m_pChannel->getSockAddr(sa);
   m.m_iPort = (AF_INET == s->m_pUDT->m_iIPversion) ? ntohs(((sockaddr_in*)sa)->sin_port) : ntohs(((sockaddr_in6*)sa)->sin6_port);
   if (AF_INET == s->m_pUDT->m_iIPversion) delete (sockaddr_in*)sa; else delete (sockaddr_in6*)sa;

   m.m_pTimer = new CTimer;

   m.m_pSndQueue = new CSndQueue; //新建发送队列
   m.m_pSndQueue->init(m.m_pChannel, m.m_pTimer);
   m.m_pRcvQueue = new CRcvQueue; //新建接收队列
   m.m_pRcvQueue->init(32, s->m_pUDT->m_iPayloadSize, m.m_iIPversion, 1024, m.m_pChannel, m.m_pTimer);

   m_mMultiplexer[m.m_iID] = m;

   s->m_pUDT->m_pSndQueue = m.m_pSndQueue;
   s->m_pUDT->m_pRcvQueue = m.m_pRcvQueue;
   s->m_iMuxID = m.m_iID;
}

创建 channel 以后,新建发送接收队列,同时还需要分别调用 init 函数,该方法的主要作用除了对一部分参数初始化外,就是创建工作线程 worker 。

以发送队列举例,接收队列以及具体的使用过程后续文章介绍:

void CSndQueue::init(CChannel* c, CTimer* t)
{
    m_pChannel = c;
    m_pTimer = t;
    m_pSndUList = new CSndUList;
    m_pSndUList->m_pWindowLock = &m_WindowLock;
    m_pSndUList->m_pWindowCond = &m_WindowCond;
    m_pSndUList->m_pTimer = m_pTimer;

    if (0 != pthread_create(&m_WorkerThread, NULL, CSndQueue::worker, this))
    {
        m_WorkerThread = 0;
        throw CUDTException(3, 1);
    }
}

对于发送队列的工作线程:

void* CSndQueue::worker(void* param)
{
   CSndQueue* self = (CSndQueue*)param;
   while (!self->m_bClosing) //只要队列处于正常状态,就无限循环
   {
      uint64_t ts = self->m_pSndUList->getNextProcTime();
      if (ts > 0)
      {
         // wait until next processing time of the first socket on the list
         uint64_t currtime;
         CTimer::rdtsc(currtime);
         if (currtime < ts)  //未到时间,继续等待
            self->m_pTimer->sleepto(ts);

         // it is time to send the next pkt
         sockaddr* addr;
         CPacket pkt;
         if (self->m_pSndUList->pop(addr, pkt) < 0)
            continue;

         self->m_pChannel->sendto(addr, pkt);
      }
      else
      {
         // wait here if there is no sockets with data to be sent      
         pthread_mutex_lock(&self->m_WindowLock);
         if (!self->m_bClosing && (self->m_pSndUList->m_iLastEntry < 0))
            pthread_cond_wait(&self->m_WindowCond, &self->m_WindowLock);//条件等待
         pthread_mutex_unlock(&self->m_WindowLock);
      }
   }
   return NULL;
}

UDT::listen

  • CUDT::listen -> CUDTUnited::listen

UDT listen 在 UDT socket 处于OPENED 状态时,开始端口监听,从 UDT bind 可知,此时已经 bind 成功。一个端口上只能有一个 listening socket。这里的 listen 用于 C/S 模式,不支持汇合模式。对于已经处于监听状态的 UDT socket, 不会多次监听。

执行 UDT listen 成功后,m_bListening 修改为 true, UDT socket 状态 m_Status 变成 LISTENING。在 UDT socket 中新建两个集合 m_pQueuedSockets 和 m_pAcceptSockets,分别存放接收但还未来得及处理接受的连接请求 ,或者已经接受的连接请求。使用集合,也是借用集合元素唯一的特性。

实际上,执行 UDT listen 是设置监听到复用器中的接收队列 m_pRcvQueue。在队列的工作线程中,将会根据到来的包的类型进行对应的响应,并发送。

int CUDTUnited::listen(const UDTSOCKET u, int backlog)
{
   CUDTSocket* s = locate(u);

   CGuard cg(s->m_ControlLock);

   // do nothing if the socket is already listening
   if (LISTENING == s->m_Status)
      return 0;

   // a socket can listen only if is in OPENED status
   if (OPENED != s->m_Status)
      throw CUDTException(5, 5, 0);

   // listen is not supported in rendezvous connection setup
   if (s->m_pUDT->m_bRendezvous)
      throw CUDTException(5, 7, 0);

   if (backlog <= 0)
      throw CUDTException(5, 3, 0);

   s->m_uiBackLog = backlog;

   try
   {  //新建 接收到但未接受的socket集合与以及接受的集合,使用 set保证每个socket唯一
      s->m_pQueuedSockets = new set<UDTSOCKET>; 
      s->m_pAcceptSockets = new set<UDTSOCKET>;
   }
   catch (...) { ... }

   s->m_pUDT->listen();
   s->m_Status = LISTENING;

   return 0;
}

通过接收队列设置UDT 实例中的Listener,实际上是设置到复用器中

void CUDT::listen()
{
   CGuard cg(m_ConnectionLock);

   if (!m_bOpened)
      throw CUDTException(5, 0, 0);

   if (m_bConnecting || m_bConnected)
      throw CUDTException(5, 2, 0);

   // listen can be called more than once
   if (m_bListening)
      return;

   // if there is already another socket listening on the same port
   if (m_pRcvQueue->setListener(this) < 0)  // 为CRcvQueue 中 设置  = this
      throw CUDTException(5, 11, 0);

   m_bListening = true; //修改监听状态
}

通过 UDT listen 设置,为复用器设置 Listener,当接收到数据时,将数据分发到对应的 UDT 实例。同时修改UDT 的当前状态。

在 setListener 以后, 非空,在CRcvQueue中线程函数 worker 循环中,会调用recvfrom 接收到连接请求,检查m_Packet.m_iID, 决定是否调用 connect。

void* CRcvQueue::worker(void* param)
{
   CRcvQueue* self = (CRcvQueue*)param;

   sockaddr* addr = ...
   CUDT* u = NULL;
   int32_t id;

   while (!self->m_bClosing)
   {
      unit->m_Packet.setLength(self->m_iPayloadSize);

      // reading next incoming packet, recvfrom returns -1 is nothing has been received
      if (self->m_pChannel->recvfrom(addr, unit->m_Packet) < 0)
         goto TIMER_CHECK;

      id = unit->m_Packet.m_iID;

      // ID 0 is for connection request, which should be passed to the listening socket or rendezvous sockets
      if (0 == id)
      {
         if (NULL != self->m_pListener)
            self->m_pListener->listen(addr, unit->m_Packet);
         else if (NULL != (u = self->m_pRendezvousQueue->retrieve(addr, id)))
         {
            // asynchronous connect: call connect here
            // otherwise wait for the UDT socket to retrieve this packet
            if (!u->m_bSynRecving)
               u->connect(unit->m_Packet);
            else
               self->storePkt(id, unit->m_Packet.clone());
         }
      }
   }
}

对于到达的一个连接请求,如果非空,就可以调用 CUDT 中的私有 listen 方法。对到达的建立连接的包进行解析,生成 coockie字符串,如果时正常的连接请求,则调用发送队列 sendto 发送包。如果是响应消息,且通过cookie 验证,则建立新连接。

int CUDT::listen(sockaddr* addr, CPacket& packet)
{
   CHandShake hs;
   hs.deserialize(packet.m_pcData, packet.getLength());

   // SYN cookie
   char clienthost[NI_MAXHOST];
   char clientport[NI_MAXSERV];
   getnameinfo(addr, (AF_INET == m_iVersion) ? sizeof(sockaddr_in) : sizeof(sockaddr_in6), clienthost, sizeof(clienthost), clientport, sizeof(clientport), NI_NUMERICHOST|NI_NUMERICSERV);
   int64_t timestamp = (CTimer::getTime() - m_StartTime) / 60000000;  // secret changes every one minute
   stringstream cookiestr;
   cookiestr << clienthost << ":" << clientport << ":" << timestamp;
   unsigned char cookie[16];
   CMD5::compute(cookiestr.str().c_str(), cookie);

   // connection request type: 1: regular connection request, 0: rendezvous connection request, -1/-2: response
   if (1 == hs.m_iReqType) 
   {
      hs.m_iCookie = *(int*)cookie;
      packet.m_iID = hs.m_iID;
      int size = packet.getLength();
      hs.serialize(packet.m_pcData, size);
      m_pSndQueue->sendto(addr, packet);
      return 0;
   }
   else
   {
      if (hs.m_iCookie != *(int*)cookie)
      {
         timestamp --;
         cookiestr << clienthost << ":" << clientport << ":" << timestamp;
         CMD5::compute(cookiestr.str().c_str(), cookie);

         if (hs.m_iCookie != *(int*)cookie)
            return -1;
      }
   }

   int32_t id = hs.m_iID;

   // When a peer side connects in...
   if ((1 == packet.getFlag()) && (0 == packet.getType()))
   { // 控制包,且当前为  Connection Handshake
      if ((hs.m_iVersion != m_iVersion) || (hs.m_iType != m_iSockType))
      {
         // mismatch, reject the request
         hs.m_iReqType = 1002;
         int size = CHandShake::m_iContentSize;
         hs.serialize(packet.m_pcData, size);
         packet.m_iID = id;
         m_pSndQueue->sendto(addr, packet);
      }
      else
      {  
         int result = s_UDTUnited.newConnection(m_SocketID, addr, &hs);
         if (result == -1)
            hs.m_iReqType = 1002;

         // send back a response if connection failed or connection already existed
         // new connection response should be sent in connect()
         if (result != 1)
         {
            int size = CHandShake::m_iContentSize;
            hs.serialize(packet.m_pcData, size);
            packet.m_iID = id;
            m_pSndQueue->sendto(addr, packet);
         }
         else
         {
            // a new connection has been created, enable epoll for write 
            s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_OUT, true);
         }
      }
   }

   return hs.m_iReqType;
}

在上面的代码中,newConnection 主要作用是建立一个新的连接,实质上是将连接的对端信息加入到 UDT 中存放对端连接socket 记录的 m_PeerRec map,并在 m_Sockets 加入新建立的 UDT socket。

首先检查 这个连接是否已经建立,如果已经建立,则返回已经存在 UDT socket 信息;如果当前处于 BROKEN 状态,会修改状态到 CLOSED, 并进行一些清理工作。其余的清理流程在 UDT 关闭过程中处理。

如果是一个新的连接,新建 UDT socket, 初始化部分参数,包括 m_pSelfAddr, m_SocketID, m_ListenSocket,m_PeerID, m_iISN等,然后绑定新的地址到监听socket。修改状态为 CONNECTED。修改 m_PeerRec 与 m_Sockets, 插入socket 进入 m_pQueuedSockets, 更新本地节点信息, 更新事件与定时器。等待 accept 事件到来。

int CUDTUnited::newConnection(const UDTSOCKET listen, const sockaddr* peer, CHandShake* hs)
{
   CUDTSocket* ns = NULL;
   CUDTSocket* ls = locate(listen); //在 m_Sockets中查找本地 UDT socket。

   if (NULL == ls)
      return -1;

   // if this connection has already been processed
   if (NULL != (ns = locate(peer, hs->m_iID, hs->m_iISN))) //在 m_PeerRec 与 m_Sockets中查找。
   {
      if (ns->m_pUDT->m_bBroken)
      {
         // last connection from the "peer" address has been broken
         ...
      }
      else
      {
         // connection already exist, this is a repeated connection request
         // respond with existing HS information
         ...
         return 0;
         //except for this situation a new connection should be started
      }
   }

   // exceeding backlog, refuse the connection request
   if (ls->m_pQueuedSockets->size() >= ls->m_uiBackLog)
      return -1;

   try
   {
      ns = new CUDTSocket;
      ns->m_pUDT = new CUDT(*(ls->m_pUDT));
      ...
      ns->m_pSelfAddr = ...
      ...
   }
   catch (...) { ...  }

   CGuard::enterCS(m_IDLock);
   ns->m_SocketID = -- m_SocketID;
   CGuard::leaveCS(m_IDLock);

   ns->m_ListenSocket = listen;
   ns->m_iIPversion = ls->m_iIPversion;
   ns->m_pUDT->m_SocketID = ns->m_SocketID;
   ns->m_PeerID = hs->m_iID;
   ns->m_iISN = hs->m_iISN;

   int error = 0;

   try
   {
      // bind to the same addr of listening socket
      ns->m_pUDT->open();
      updateMux(ns, ls);
      ns->m_pUDT->connect(peer, hs);
   }
   catch (...)
   {
      error = 1;
      goto ERR_ROLLBACK;
   }

   ns->m_Status = CONNECTED;

   // copy address information of local node
   ns->m_pUDT->m_pSndQueue->m_pChannel->getSockAddr(ns->m_pSelfAddr);
   CIPAddress::pton(ns->m_pSelfAddr, ns->m_pUDT->m_piSelfIP, ns->m_iIPversion);

   // protect the m_Sockets structure.
   CGuard::enterCS(m_ControlLock);
   try
   {
      m_Sockets[ns->m_SocketID] = ns;
      m_PeerRec[(ns->m_PeerID << 30) + ns->m_iISN].insert(ns->m_SocketID);
   }
   catch (...)
   {
      error = 2;
   }
   CGuard::leaveCS(m_ControlLock);

   CGuard::enterCS(ls->m_AcceptLock);
   try
   {
      ls->m_pQueuedSockets->insert(ns->m_SocketID);
   }
   catch (...)
   {
      error = 3;
   }
   CGuard::leaveCS(ls->m_AcceptLock);

   // acknowledge users waiting for new connections on the listening socket
   m_EPoll.update_events(listen, ls->m_pUDT->m_sPollID, UDT_EPOLL_IN, true);

   CTimer::triggerEvent();

   ERR_ROLLBACK:
   if (error > 0)
   {
      ns->m_pUDT->close();
      ns->m_Status = CLOSED;
      ns->m_TimeStamp = CTimer::getTime();

      return -1;
   }

   // wake up a waiting accept() call
   #ifndef WIN32
      pthread_mutex_lock(&(ls->m_AcceptLock));
      pthread_cond_signal(&(ls->m_AcceptCond));
      pthread_mutex_unlock(&(ls->m_AcceptLock));
   #else
      SetEvent(ls->m_AcceptCond);
   #endif

   return 1;
}

UDT connect

  • CUDT::connect( api.cpp, CUDT::public method) -> CUDTUnited::connect -> CUDT::connect( core.cpp, CUDT::private method)

如果UDT socket 能够 connect,首先应该处于 INIT 或者 OPENED 状态。如果处于 INIT状态,表明为新创建的UDT Socket,需要初始化 m_pUDT 内参数并注册到复用器,修改状态为 OPENED。如果处于OPENED 状态,可能已经被 bind 过,则可以进入 CONNECTING 状态,并调用 m_pUDT->connect。记录对端地址 m_pPeerAddr到该 UDT socket内部结构。

int CUDTUnited::connect(const UDTSOCKET u, const sockaddr* name, int namelen)
{
   CUDTSocket* s = locate(u);
   CGuard cg(s->m_ControlLock);

   // a socket can "connect" only if it is in INIT or OPENED status
   if (INIT == s->m_Status)
   {
      if (!s->m_pUDT->m_bRendezvous)
      {
         s->m_pUDT->open();
         updateMux(s);
         s->m_Status = OPENED;
      }
      else
         throw CUDTException(5, 8, 0);
   }
   else if (OPENED != s->m_Status)
      throw CUDTException(5, 2, 0);

   // connect_complete() may be called before connect() returns.
   // So we need to update the status before connect() is called,
   // otherwise the status may be overwritten with wrong value (CONNECTED vs. CONNECTING).
   s->m_Status = CONNECTING;
   try
   {
      s->m_pUDT->connect(name);
   }
   catch (CUDTException e)
   {
      s->m_Status = OPENED;
      throw e;
   }

   // record peer address
   delete s->m_pPeerAddr;
   if (AF_INET == s->m_iIPversion)
   {
      s->m_pPeerAddr = (sockaddr*)(new sockaddr_in);
      memcpy(s->m_pPeerAddr, name, sizeof(sockaddr_in));
   }
   else
   {
      s->m_pPeerAddr = (sockaddr*)(new sockaddr_in6);
      memcpy(s->m_pPeerAddr, name, sizeof(sockaddr_in6));
   }

   return 0;
}

在 CUDT 中, 在private 方法中,connect 有三种形式:

// Functionality:
//    Connect to a UDT entity listening at address "peer".
// Parameters:
//    0) [in] peer: The address of the listening UDT entity.
// Returned value:
//    None.
void CUDT::connect(const sockaddr* serv_addr) 

// Functionality:
//    Process the response handshake packet.
// Parameters:
//    0) [in] pkt: handshake packet.
// Returned value:
//    Return 0 if connected, positive value if connection is in progress, otherwise error code.
int CUDT::connect(const CPacket& response) throw ()

// Functionality:
//    Connect to a UDT entity listening at address "peer", which has sent "hs" request.
// Parameters:
//    0) [in] peer: The address of the listening UDT entity.
//    1) [in/out] hs: The handshake information sent by the peer side (in), negotiated value (out).
// Returned value:
//    None.
void CUDT::connect(const sockaddr* peer, CHandShake* hs)

在 connect 中调用的 connect 参数为 sockaddr,即第一种形式,这也是从外部接口 UDT::connect 调用后进入的函数:

void CUDT::connect(const sockaddr* serv_addr)
{
   CGuard cg(m_ConnectionLock);

   if (!m_bOpened) // UDT socket 处于 OPENED 状态
      throw CUDTException(5, 0, 0);

   if (m_bListening) //不能同时 listen 与 connect
      throw CUDTException(5, 2, 0);

   if (m_bConnecting || m_bConnected) //以前没有进行 connect 过
      throw CUDTException(5, 2, 0);

   m_bConnecting = true; //修改状态,防止被多次 connect

   // record peer/server address
   delete m_pPeerAddr;
   m_pPeerAddr = (AF_INET == m_iIPversion) ? (sockaddr*)new sockaddr_in : (sockaddr*)new sockaddr_in6;
   memcpy(m_pPeerAddr, serv_addr, (AF_INET == m_iIPversion) ? sizeof(sockaddr_in) : sizeof(sockaddr_in6));

   // register this socket in the rendezvous queue
   // RendezevousQueue is used to temporarily store incoming handshake, non-rendezvous connections also require this function
   uint64_t ttl = 3000000; //这也是关闭时需要额外等3s的原因
   if (m_bRendezvous)
      ttl *= 10;
   ttl += CTimer::getTime();
   //将 UDT socket 插入一个链表 m_lRendezvousID,临时存储。不管是否汇合模式,都会保存。此处可能导致误解。
   m_pRcvQueue->registerConnector(m_SocketID, this, m_iIPversion, serv_addr, ttl); 

   // This is my current configurations
   m_ConnReq.m_iVersion = m_iVersion;
   m_ConnReq.m_iType = m_iSockType;
   m_ConnReq.m_iMSS = m_iMSS;
   m_ConnReq.m_iFlightFlagSize = (m_iRcvBufSize < m_iFlightFlagSize)? m_iRcvBufSize : m_iFlightFlagSize;
   m_ConnReq.m_iReqType = (!m_bRendezvous) ? 1 : 0;
   m_ConnReq.m_iID = m_SocketID;
   CIPAddress::ntop(serv_addr, m_ConnReq.m_piPeerIP, m_iIPversion);

   // Random Initial Sequence Number
   srand((unsigned int)CTimer::getTime());
   m_iISN = m_ConnReq.m_iISN = (int32_t)(CSeqNo::m_iMaxSeqNo * (double(rand()) / RAND_MAX));

   m_iLastDecSeq = m_iISN - 1;
   m_iSndLastAck = m_iISN;
   m_iSndLastDataAck = m_iISN;
   m_iSndCurrSeqNo = m_iISN - 1;
   m_iSndLastAck2 = m_iISN;
   m_ullSndLastAck2Time = CTimer::getTime();

   // Inform the server my configurations.
   CPacket request;
   char* reqdata = new char [m_iPayloadSize];
   request.pack(0, NULL, reqdata, m_iPayloadSize); //建包
   // ID = 0, connection request
   request.m_iID = 0;

   int hs_size = m_iPayloadSize;
   m_ConnReq.serialize(reqdata, hs_size); //写入请求
   request.setLength(hs_size);
   m_pSndQueue->sendto(serv_addr, request); // 发送请求
   m_llLastReqTime = CTimer::getTime(); //更新定时器

   // asynchronous connect, return immediately
   if (!m_bSynRecving)
   {
      delete [] reqdata;
      return;
   }

   // Wait for the negotiated configurations from the peer side.
   CPacket response;
   char* resdata = new char [m_iPayloadSize];
   response.pack(0, NULL, resdata, m_iPayloadSize);

   CUDTException e(0, 0);

   while (!m_bClosing) // 等待 connect 返回,最多等待3s, 如果没有响应,会重复发送请求
   {
      // avoid sending too many requests, at most 1 request per 250ms
      if (CTimer::getTime() - m_llLastReqTime > 250000)
      {
         m_ConnReq.serialize(reqdata, hs_size);
         request.setLength(hs_size);
         if (m_bRendezvous)
            request.m_iID = m_ConnRes.m_iID;
         m_pSndQueue->sendto(serv_addr, request);
         m_llLastReqTime = CTimer::getTime();
      }

      response.setLength(m_iPayloadSize);
      if (m_pRcvQueue->recvfrom(m_SocketID, response) > 0)
      {
         if (connect(response) <= 0)
            break;

         // new request/response should be sent out immediately on receving a response
         m_llLastReqTime = 0;
      }

      if (CTimer::getTime() > ttl)
      {
         // timeout
         e = CUDTException(1, 1, 0);
         break;
      }
   }

   delete [] reqdata;
   delete [] resdata;

   if (e.getErrorCode() == 0)
   {
      if (m_bClosing)                                                 // if the socket is closed before connection...
         e = CUDTException(1);
      else if (1002 == m_ConnRes.m_iReqType)                          // connection request rejected
         e = CUDTException(1, 2, 0);
      else if ((!m_bRendezvous) && (m_iISN != m_ConnRes.m_iISN))      // secuity check
         e = CUDTException(1, 4, 0);
   }

   if (e.getErrorCode() != 0)
      throw e;
}

发出 connect 连接请求以后,如果属于同步方式,将等待返回,超时时间设置为3s,并会间隔250ms 不断发送请求,直至收到响应。接收到响应以后, connect(response), 即为 第二种 connect 方法。

建立连接的过程可以参考 TCP 的半连接的思想。这第二次的connect 实际上就是 第二个半连接的建立过程,也是最后一个协商过程。对于非汇合模式,从 m_lRendezvousID 中移除,重新配置所有的连接参数, 为UDT socket 建立对应的各种数据结构,包括 发送接收buffer,丢失链表,窗口等,这些是数据传输过程中需要使用的内部结构,服务于 UDT 的核心传输算法,包括拥塞避免,重传等。所以也会初始化拥塞控制相关参数,最后,设置当前状态 为已连接状态。通过 connect_complete 与 update_events 通知管理模块与epool 状态更新。

int CUDT::connect(const CPacket& response) throw ()
{
   // this is the 2nd half of a connection request. If the connection is setup successfully this returns 0.
   // returning -1 means there is an error.
   // returning 1 or 2 means the connection is in process and needs more handshake

   if (!m_bConnecting)
      return -1;

   if (m_bRendezvous && ((0 == response.getFlag()) || (1 == response.getType())) && (0 != m_ConnRes.m_iType))
   {
      //a data packet or a keep-alive packet comes, which means the peer side is already connected
      // in this situation, the previously recorded response will be used
      goto POST_CONNECT;
   }

   if ((1 != response.getFlag()) || (0 != response.getType()))
      return -1;

   m_ConnRes.deserialize(response.m_pcData, response.getLength());

   if (m_bRendezvous)
   {
      // regular connect should NOT communicate with rendezvous connect
      // rendezvous connect require 3-way handshake
      if (1 == m_ConnRes.m_iReqType)
         return -1;

      if ((0 == m_ConnReq.m_iReqType) || (0 == m_ConnRes.m_iReqType))
      {
         m_ConnReq.m_iReqType = -1;
         // the request time must be updated so that the next handshake can be sent out immediately.
         m_llLastReqTime = 0;
         return 1;
      }
   }
   else
   {
      // set cookie
      if (1 == m_ConnRes.m_iReqType)
      {
         m_ConnReq.m_iReqType = -1;
         m_ConnReq.m_iCookie = m_ConnRes.m_iCookie;
         m_llLastReqTime = 0;
         return 1;
      }
   }

POST_CONNECT:
   // Remove from rendezvous queue
   m_pRcvQueue->removeConnector(m_SocketID);

   // Re-configure according to the negotiated values.
   m_iMSS = m_ConnRes.m_iMSS;
   m_iFlowWindowSize = m_ConnRes.m_iFlightFlagSize;
   m_iPktSize = m_iMSS - 28;
   m_iPayloadSize = m_iPktSize - CPacket::m_iPktHdrSize;
   m_iPeerISN = m_ConnRes.m_iISN;
   m_iRcvLastAck = m_ConnRes.m_iISN;
   m_iRcvLastAckAck = m_ConnRes.m_iISN;
   m_iRcvCurrSeqNo = m_ConnRes.m_iISN - 1;
   m_PeerID = m_ConnRes.m_iID;
   memcpy(m_piSelfIP, m_ConnRes.m_piPeerIP, 16);

   // Prepare all data structures
   try
   {
      m_pSndBuffer = new CSndBuffer(32, m_iPayloadSize);
      m_pRcvBuffer = new CRcvBuffer(&(m_pRcvQueue->m_UnitQueue), m_iRcvBufSize);
      // after introducing lite ACK, the sndlosslist may not be cleared in time, so it requires twice space.
      m_pSndLossList = new CSndLossList(m_iFlowWindowSize * 2);
      m_pRcvLossList = new CRcvLossList(m_iFlightFlagSize);
      m_pACKWindow = new CACKWindow(1024);
      m_pRcvTimeWindow = new CPktTimeWindow(16, 64);
      m_pSndTimeWindow = new CPktTimeWindow();
   }
   catch (...)
   {
      throw CUDTException(3, 2, 0);
   }

   CInfoBlock ib;
   ib.m_iIPversion = m_iIPversion;
   CInfoBlock::convert(m_pPeerAddr, m_iIPversion, ib.m_piIP);
   if (m_pCache->lookup(&ib) >= 0)
   {
      m_iRTT = ib.m_iRTT;
      m_iBandwidth = ib.m_iBandwidth;
   }

   m_pCC = m_pCCFactory->create();
   m_pCC->m_UDT = m_SocketID;
   m_pCC->setMSS(m_iMSS);
   m_pCC->setMaxCWndSize(m_iFlowWindowSize);
   m_pCC->setSndCurrSeqNo(m_iSndCurrSeqNo);
   m_pCC->setRcvRate(m_iDeliveryRate);
   m_pCC->setRTT(m_iRTT);
   m_pCC->setBandwidth(m_iBandwidth);
   m_pCC->init();

   m_ullInterval = (uint64_t)(m_pCC->m_dPktSndPeriod * m_ullCPUFrequency);
   m_dCongestionWindow = m_pCC->m_dCWndSize;

   // And, I am connected too.
   m_bConnecting = false;
   m_bConnected = true;

   // register this socket for receiving data packets
   m_pRNode->m_bOnList = true;
   m_pRcvQueue->setNewEntry(this);

   // acknowledge the management module.
   s_UDTUnited.connect_complete(m_SocketID); // 更新m_pSndQueue->m_pChannel本地节点信息,设置状态为 CONNECTED

   // acknowledde any waiting epolls to write
   s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_OUT, true);

   return 0;
}

UDT accept

  • UDT::accept -> CUDT::accept -> CUDTUnited::accept

UDT accept 与 TCP socket 中 的accept一样,在socket bind 以后就可以使用,等待其他连接到来。所以当前的 UDT Listener 的状态为 LISTENING。 仅在非汇合模式下使用。

主要的过程是一个while 循环,等待accept事件。当到来以后,删除 m_pQueuedSockets中的节点,插入 m_pAcceptSockets。update_events 发给 epool事件更新。存储对端地址。

UDTSOCKET CUDTUnited::accept(const UDTSOCKET listen, sockaddr* addr, int* addrlen)
{
   CUDTSocket* ls = locate(listen);

   // the "listen" socket must be in LISTENING status
   if (LISTENING != ls->m_Status)
      throw CUDTException(5, 6, 0);

   // no "accept" in rendezvous connection setup
   if (ls->m_pUDT->m_bRendezvous)
      throw CUDTException(5, 7, 0);

   UDTSOCKET u = CUDT::INVALID_SOCK;
   bool accepted = false;

   // !!only one conection can be set up each time!!
   #ifndef WIN32
      while (!accepted)   //循环等待连接到来 accepted = true 时退出
      {
         pthread_mutex_lock(&(ls->m_AcceptLock));

         if ((LISTENING != ls->m_Status) || ls->m_pUDT->m_bBroken)
         {
            // This socket has been closed.
            accepted = true;
         }
         else if (ls->m_pQueuedSockets->size() > 0)
         {  //更新 m_pAcceptSockets 和 m_pQueuedSockets
            u = *(ls->m_pQueuedSockets->begin()); 
            ls->m_pAcceptSockets->insert(ls->m_pAcceptSockets->end(), u);
            ls->m_pQueuedSockets->erase(ls->m_pQueuedSockets->begin());
            accepted = true;
         }
         else if (!ls->m_pUDT->m_bSynRecving)
         {
            accepted = true;
         }

         if (!accepted && (LISTENING == ls->m_Status))
            pthread_cond_wait(&(ls->m_AcceptCond), &(ls->m_AcceptLock));

         if (ls->m_pQueuedSockets->empty())
            m_EPoll.update_events(listen, ls->m_pUDT->m_sPollID, UDT_EPOLL_IN, false);

         pthread_mutex_unlock(&(ls->m_AcceptLock));
      }
   #else
      while (!accepted)
      {
         WaitForSingleObject(ls->m_AcceptLock, INFINITE);

         if (ls->m_pQueuedSockets->size() > 0)
         {
            u = *(ls->m_pQueuedSockets->begin());
            ls->m_pAcceptSockets->insert(ls->m_pAcceptSockets->end(), u);
            ls->m_pQueuedSockets->erase(ls->m_pQueuedSockets->begin());

            accepted = true;
         }
         else if (!ls->m_pUDT->m_bSynRecving)
            accepted = true;

         ReleaseMutex(ls->m_AcceptLock);

         if  (!accepted & (LISTENING == ls->m_Status))
            WaitForSingleObject(ls->m_AcceptCond, INFINITE);

         if ((LISTENING != ls->m_Status) || ls->m_pUDT->m_bBroken)
         {
            // Send signal to other threads that are waiting to accept.
            SetEvent(ls->m_AcceptCond);
            accepted = true;
         }

         if (ls->m_pQueuedSockets->empty())
            m_EPoll.update_events(listen, ls->m_pUDT->m_sPollID, UDT_EPOLL_IN, false);
      }
   #endif

   if (u == CUDT::INVALID_SOCK)
   {
      // non-blocking receiving, no connection available
      if (!ls->m_pUDT->m_bSynRecving)
         throw CUDTException(6, 2, 0);

      // listening socket is closed
      throw CUDTException(5, 6, 0);
   }

   // 存储对端的地址
   if ((addr != NULL) && (addrlen != NULL))
   {
      if (AF_INET == locate(u)->m_iIPversion)
         *addrlen = sizeof(sockaddr_in);
      else
         *addrlen = sizeof(sockaddr_in6);

      // copy address information of peer node
      memcpy(addr, locate(u)->m_pPeerAddr, *addrlen);
   }

   return u;
}

总结

UDT支持两种连接模式:C/S 模式和汇合模式。UDT client 发送一个握手消息(type为 0 的控制报文)给 server 或者 peer。消息携带信息格式见文章 UDT最新协议分析

C/S 模式–四次握手

如果一个UDT socket 作为server,会建立一个UDT实体, 并作为 Listener 监听绑定的端口,当有新的连接请求到来时,就会新创建一个 UDT socket,并初始化相关信息,并将新的 UDT socket 相关的信息写入到 Listener。这就是一个连接的建立过程,和TCP的连接过程比较相似。

  1. 当 UDT client 要对一个UDT server建立连接的时候,会在3s内每间隔 250ms 连续发送握手报文,直到收到server反馈回来的握手的报文或者连接超时。
  2. 当 UDT server 第一次接收到来自 UDT client 的握手连接请求的时候,它会根据 client 的 address 和一个 secret key 产生一个 cookie 值,然后发送给 client。
  3. 当 UDT client 收到回应以后,必须把收到的 cookie 再返回发送给 server。
  4. 当 UDT server 接收到一个握手报文和正确的 cookie 时,协商包大小与最大窗口,并把协商结果发送给 client。
  • UDT server 将握手报文中的 packet size 和 maximum window size 信息提取出来,并同 server 端自己的 packet size 和 maximum window size信息相比较,将较小的 packet size 和 maximum window size 信息赋值给自己。
  • UDT server 把包大小与最大窗口等结果发送给client端,并携带上 server 的版本号和初始序列号。为防止丢包,如果后续还接收到同一对端其他握手消息时,仍需要继续发送响应。
  • UDT server 准备接收发送数据。
  • UDT client 收到 server 发送的握手包,开始发送接收数据,如果还有其他握手消息,不再回应。

Rendezvous模式–三次握手

汇合模式下,两端均为客户端,需要两端同时调用udt::connect, 主要用于NAT穿透的情况。

  1. 两个UDT client 同时向对方发起连接请求,发送握手包。连接类型初始值为0。
  2. UDT client 收到对端发送的连接请求后,检查连接类型
  • 如果连接类型为0,那么响应报文中会被设置成-1。
  • 如果连接类型为-1,那么响应报文中会被设置成-2。
  • 如果连接类型为-2,那么将不会有任何反馈信息。
 类似资料: