- UDT::socket -> UDT::setsockopt -> UDT::connect -> UDT::send -> UDT::close
- UDT::socket -> UDT::setsockopt -> UDT::bind -> UDT::listen -> UDT::accept -> UDT::recv -> UDT::close
- UDT::setsockopt(usock, 0, UDT_RENDEZVOUS, &rendezvous, sizeof(bool))
详细源码分析在下一篇文章中。
- UDT::epoll_create -> UDT::epoll_add_usock/epoll_add_ssock -> UDT::epoll_wait/epoll_wait2 -> UDT::epoll_release
- UDT::socket -> CUDT::socket -> CUDTUnited::newSocket
UDT socket 的创建过程主要分为以下几步:
- 如果GC 线程未启动,则首先启动;
- 新建 CUDTSocket,初始化新建内在变量 m_pUDT,以及地址信息 m_pSelfAddr,UDT socket 的标识 m_SocketID,UDT socket的状态 m_Status 设置为 INIT, m_ListenSocket 初始设置为0;
- 将 UDT socket中信息注册到m_pUDT, 包括 m_SocketID,m_iSockType,m_iIPversion,m_pCache;
- 将 UDT socket加入 全局m_Sockets map;
- 返回标识 m_SocketID。
首先代码分析还是从对外提供的接口调用开始。
UDTSOCKET CUDT::socket(int af, int type, int)
{
if (!s_UDTUnited.m_bGCStatus)
s_UDTUnited.startup(); // 如果GC 线程未启动,那么首先启动
return s_UDTUnited.newSocket(af, type); //创建一个 UDT socket
}
接下来是通过 newSocket 创建 UDT socket 的过程。
UDTSOCKET CUDTUnited::newSocket(int af, int type)
{
CUDTSocket* ns = NULL;
try
{
ns = new CUDTSocket; //新建UDT socket
ns->m_pUDT = new CUDT; //CUDT 在创建socket时新建
if (AF_INET == af)
{
ns->m_pSelfAddr = (sockaddr*)(new sockaddr_in);
((sockaddr_in*)(ns->m_pSelfAddr))->sin_port = 0;
}
else
{
ns->m_pSelfAddr = (sockaddr*)(new sockaddr_in6); //支持IPv6
((sockaddr_in6*)(ns->m_pSelfAddr))->sin6_port = 0;
}
}
catch (...) { ... }
CGuard::enterCS(m_IDLock);
ns->m_SocketID = -- m_SocketID; //在初始化的随机数值上进行递减,作为 UDT socket ID
CGuard::leaveCS(m_IDLock);
ns->m_Status = INIT; //设置为 INIT 状态
ns->m_ListenSocket = 0;
ns->m_pUDT->m_SocketID = ns->m_SocketID; //将刚刚获得 UDT socket ID 注册到 CUDT中
ns->m_pUDT->m_iSockType = (SOCK_STREAM == type) ? UDT_STREAM : UDT_DGRAM;
ns->m_pUDT->m_iIPversion = ns->m_iIPversion = af;
ns->m_pUDT->m_pCache = m_pCache;
// protect the m_Sockets structure.
CGuard::enterCS(m_ControlLock);
try
{
m_Sockets[ns->m_SocketID] = ns; //将 UDT socket加入 全局m_Sockets map
}
catch (...)
{
//failure and rollback
...
}
CGuard::leaveCS(m_ControlLock);
return ns->m_SocketID;
}
m_SocketID 的初始化,在CUDTUnited构造函数中被初始化为一个随机数。构造函数在系统初始化 startup 时被调用。
CUDTUnited::CUDTUnited():
m_SocketID(0),
{
// Socket ID MUST start from a random value
srand((unsigned int)CTimer::getTime());
m_SocketID = 1 + (int)((1 << 30) * (double(rand()) / RAND_MAX));
}
UDT Socket 参数设置。
- CUDT::setsockopt -> CUDT::setOpt
- CUDT::getsockopt -> CUDT::getOpt
UDT 可配置的参数中包括一些系统内部自定义参数,这些参数的定义如下所示:
enum UDTOpt
{
UDT_MSS, // the Maximum Transfer Unit
UDT_SNDSYN, // if sending is blocking
UDT_RCVSYN, // if receiving is blocking
UDT_CC, // custom congestion control algorithm
UDT_FC, // Flight flag size (window size)
UDT_SNDBUF, // maximum buffer in sending queue
UDT_RCVBUF, // UDT receiving buffer size
UDT_LINGER, // waiting for unsent data when closing
UDP_SNDBUF, // UDP sending buffer size
UDP_RCVBUF, // UDP receiving buffer size
UDT_MAXMSG, // maximum datagram message size
UDT_MSGTTL, // time-to-live of a datagram message
UDT_RENDEZVOUS, // rendezvous connection mode
UDT_SNDTIMEO, // send() timeout
UDT_RCVTIMEO, // recv() timeout
UDT_REUSEADDR, // reuse an existing port or create a new one
UDT_MAXBW, // maximum bandwidth (bytes per second) that the connection can use
UDT_STATE, // current socket state, see UDTSTATUS, read only
UDT_EVENT, // current avalable events associated with the socket
UDT_SNDDATA, // size of data in the sending buffer
UDT_RCVDATA // size of data available for recv
};
- CUDT::bind -> CUDTUnited::bind
- 不同形式
- int CUDTUnited::bind(UDTSOCKET u, UDPSOCKET udpsock)
- int CUDTUnited::bind(const UDTSOCKET u, const sockaddr* name, int namelen)
UDT bind 过程涉及到的模块较多,总的来说,就是将创建的 UDT socket 的信息注册到一个复用器上,如果复用器不存在则创建。每个复用器保证用于一个端口,每个复用器有一个 channel, 用于 udp socket 的创建,端口绑定等,修改 UDT socket 状态, 从 INIT 迁移到 OPENED。
也就是说,UDT socket通过UDT bind 与复用器 CMultiplexer 关联在一起,channel 作为 udp socket 的真正执行者进行运行,通过发送接收的两个工作线程完成数据的收发。发送接收的两个队列属于复用器,但是通过复用器ID使得 UDT socket 发送数据时直接与 channel 打交道,不再需要查找复用器。
int CUDTUnited::bind(const UDTSOCKET u, ...)
{
CUDTSocket* s = locate(u);
CGuard cg(s->m_ControlLock);
// cannot bind a socket more than once
if (INIT != s->m_Status)
throw CUDTException(5, 0, 0);
s->m_pUDT->open(); //m_pUDT中一堆参数初始化
updateMux(s, name); //更新复用器
s->m_Status = OPENED; //更新UDT socket 状态为 OPENED
// copy address information of local node
s->m_pUDT->m_pSndQueue->m_pChannel->getSockAddr(s->m_pSelfAddr);
return 0;
}
每个端口可能被多个UDT socket复用,所以绑定端口实际上是注册到端口唯一的复用器上
- void CUDTUnited::updateMux(CUDTSocket* s, const CUDTSocket* ls)
- void CUDTUnited::updateMux(CUDTSocket* s, const sockaddr* addr, const UDPSOCKET* udpsock)
void CUDTUnited::updateMux(CUDTSocket* s, const sockaddr* addr, const UDPSOCKET* udpsock)
{
CGuard cg(m_ControlLock);
if ((s->m_pUDT->m_bReuseAddr) && (NULL != addr))
{
int port = (AF_INET == s->m_pUDT->m_iIPversion) ? ntohs(((sockaddr_in*)addr)->sin_port) : ntohs(((sockaddr_in6*)addr)->sin6_port);
// find a reusable address
for (map<int, CMultiplexer>::iterator i = m_mMultiplexer.begin(); i != m_mMultiplexer.end(); ++ i)
{
if ((i->second.m_iIPversion == s->m_pUDT->m_iIPversion) && (i->second.m_iMSS == s->m_pUDT->m_iMSS) && i->second.m_bReusable)
{
if (i->second.m_iPort == port) //找到端口对应复用器
{
// reuse the existing multiplexer
++ i->second.m_iRefCount; //引用计数加一
s->m_pUDT->m_pSndQueue = i->second.m_pSndQueue; //发送队列
s->m_pUDT->m_pRcvQueue = i->second.m_pRcvQueue; //接收队列
s->m_iMuxID = i->second.m_iID; //将复用器 ID 告知 UDT socket
return;
}
}
}
}
// a new multiplexer is needed
CMultiplexer m;
m.m_iMSS = s->m_pUDT->m_iMSS;
m.m_iIPversion = s->m_pUDT->m_iIPversion;
m.m_iRefCount = 1;
m.m_bReusable = s->m_pUDT->m_bReuseAddr;
m.m_iID = s->m_SocketID;
// 新建传输channel,设置IP 版本,以及发送接收缓冲大小,默认65536
m.m_pChannel = new CChannel(s->m_pUDT->m_iIPversion);
m.m_pChannel->setSndBufSize(s->m_pUDT->m_iUDPSndBufSize);
m.m_pChannel->setRcvBufSize(s->m_pUDT->m_iUDPRcvBufSize);
try
{ //创建并绑定真正用于传输的 udp socket,保存于 channel 中
if (NULL != udpsock)
m.m_pChannel->open(*udpsock);
else
m.m_pChannel->open(addr);
}
catch (CUDTException& e)
{
m.m_pChannel->close();
delete m.m_pChannel;
throw e;
}
//复用器相关参数赋值
sockaddr* sa = (AF_INET == s->m_pUDT->m_iIPversion) ? (sockaddr*) new sockaddr_in : (sockaddr*) new sockaddr_in6;
m.m_pChannel->getSockAddr(sa);
m.m_iPort = (AF_INET == s->m_pUDT->m_iIPversion) ? ntohs(((sockaddr_in*)sa)->sin_port) : ntohs(((sockaddr_in6*)sa)->sin6_port);
if (AF_INET == s->m_pUDT->m_iIPversion) delete (sockaddr_in*)sa; else delete (sockaddr_in6*)sa;
m.m_pTimer = new CTimer;
m.m_pSndQueue = new CSndQueue; //新建发送队列
m.m_pSndQueue->init(m.m_pChannel, m.m_pTimer);
m.m_pRcvQueue = new CRcvQueue; //新建接收队列
m.m_pRcvQueue->init(32, s->m_pUDT->m_iPayloadSize, m.m_iIPversion, 1024, m.m_pChannel, m.m_pTimer);
m_mMultiplexer[m.m_iID] = m;
s->m_pUDT->m_pSndQueue = m.m_pSndQueue;
s->m_pUDT->m_pRcvQueue = m.m_pRcvQueue;
s->m_iMuxID = m.m_iID;
}
创建 channel 以后,新建发送接收队列,同时还需要分别调用 init 函数,该方法的主要作用除了对一部分参数初始化外,就是创建工作线程 worker 。
以发送队列举例,接收队列以及具体的使用过程后续文章介绍:
void CSndQueue::init(CChannel* c, CTimer* t)
{
m_pChannel = c;
m_pTimer = t;
m_pSndUList = new CSndUList;
m_pSndUList->m_pWindowLock = &m_WindowLock;
m_pSndUList->m_pWindowCond = &m_WindowCond;
m_pSndUList->m_pTimer = m_pTimer;
if (0 != pthread_create(&m_WorkerThread, NULL, CSndQueue::worker, this))
{
m_WorkerThread = 0;
throw CUDTException(3, 1);
}
}
对于发送队列的工作线程:
void* CSndQueue::worker(void* param)
{
CSndQueue* self = (CSndQueue*)param;
while (!self->m_bClosing) //只要队列处于正常状态,就无限循环
{
uint64_t ts = self->m_pSndUList->getNextProcTime();
if (ts > 0)
{
// wait until next processing time of the first socket on the list
uint64_t currtime;
CTimer::rdtsc(currtime);
if (currtime < ts) //未到时间,继续等待
self->m_pTimer->sleepto(ts);
// it is time to send the next pkt
sockaddr* addr;
CPacket pkt;
if (self->m_pSndUList->pop(addr, pkt) < 0)
continue;
self->m_pChannel->sendto(addr, pkt);
}
else
{
// wait here if there is no sockets with data to be sent
pthread_mutex_lock(&self->m_WindowLock);
if (!self->m_bClosing && (self->m_pSndUList->m_iLastEntry < 0))
pthread_cond_wait(&self->m_WindowCond, &self->m_WindowLock);//条件等待
pthread_mutex_unlock(&self->m_WindowLock);
}
}
return NULL;
}
- CUDT::listen -> CUDTUnited::listen
UDT listen 在 UDT socket 处于OPENED 状态时,开始端口监听,从 UDT bind 可知,此时已经 bind 成功。一个端口上只能有一个 listening socket。这里的 listen 用于 C/S 模式,不支持汇合模式。对于已经处于监听状态的 UDT socket, 不会多次监听。
执行 UDT listen 成功后,m_bListening 修改为 true, UDT socket 状态 m_Status 变成 LISTENING。在 UDT socket 中新建两个集合 m_pQueuedSockets 和 m_pAcceptSockets,分别存放接收但还未来得及处理接受的连接请求 ,或者已经接受的连接请求。使用集合,也是借用集合元素唯一的特性。
实际上,执行 UDT listen 是设置监听到复用器中的接收队列 m_pRcvQueue。在队列的工作线程中,将会根据到来的包的类型进行对应的响应,并发送。
int CUDTUnited::listen(const UDTSOCKET u, int backlog)
{
CUDTSocket* s = locate(u);
CGuard cg(s->m_ControlLock);
// do nothing if the socket is already listening
if (LISTENING == s->m_Status)
return 0;
// a socket can listen only if is in OPENED status
if (OPENED != s->m_Status)
throw CUDTException(5, 5, 0);
// listen is not supported in rendezvous connection setup
if (s->m_pUDT->m_bRendezvous)
throw CUDTException(5, 7, 0);
if (backlog <= 0)
throw CUDTException(5, 3, 0);
s->m_uiBackLog = backlog;
try
{ //新建 接收到但未接受的socket集合与以及接受的集合,使用 set保证每个socket唯一
s->m_pQueuedSockets = new set<UDTSOCKET>;
s->m_pAcceptSockets = new set<UDTSOCKET>;
}
catch (...) { ... }
s->m_pUDT->listen();
s->m_Status = LISTENING;
return 0;
}
通过接收队列设置UDT 实例中的Listener,实际上是设置到复用器中
void CUDT::listen()
{
CGuard cg(m_ConnectionLock);
if (!m_bOpened)
throw CUDTException(5, 0, 0);
if (m_bConnecting || m_bConnected)
throw CUDTException(5, 2, 0);
// listen can be called more than once
if (m_bListening)
return;
// if there is already another socket listening on the same port
if (m_pRcvQueue->setListener(this) < 0) // 为CRcvQueue 中 设置 = this
throw CUDTException(5, 11, 0);
m_bListening = true; //修改监听状态
}
通过 UDT listen 设置,为复用器设置 Listener,当接收到数据时,将数据分发到对应的 UDT 实例。同时修改UDT 的当前状态。
在 setListener 以后, 非空,在CRcvQueue中线程函数 worker 循环中,会调用recvfrom 接收到连接请求,检查m_Packet.m_iID, 决定是否调用 connect。
void* CRcvQueue::worker(void* param)
{
CRcvQueue* self = (CRcvQueue*)param;
sockaddr* addr = ...
CUDT* u = NULL;
int32_t id;
while (!self->m_bClosing)
{
unit->m_Packet.setLength(self->m_iPayloadSize);
// reading next incoming packet, recvfrom returns -1 is nothing has been received
if (self->m_pChannel->recvfrom(addr, unit->m_Packet) < 0)
goto TIMER_CHECK;
id = unit->m_Packet.m_iID;
// ID 0 is for connection request, which should be passed to the listening socket or rendezvous sockets
if (0 == id)
{
if (NULL != self->m_pListener)
self->m_pListener->listen(addr, unit->m_Packet);
else if (NULL != (u = self->m_pRendezvousQueue->retrieve(addr, id)))
{
// asynchronous connect: call connect here
// otherwise wait for the UDT socket to retrieve this packet
if (!u->m_bSynRecving)
u->connect(unit->m_Packet);
else
self->storePkt(id, unit->m_Packet.clone());
}
}
}
}
对于到达的一个连接请求,如果非空,就可以调用 CUDT 中的私有 listen 方法。对到达的建立连接的包进行解析,生成 coockie字符串,如果时正常的连接请求,则调用发送队列 sendto 发送包。如果是响应消息,且通过cookie 验证,则建立新连接。
int CUDT::listen(sockaddr* addr, CPacket& packet)
{
CHandShake hs;
hs.deserialize(packet.m_pcData, packet.getLength());
// SYN cookie
char clienthost[NI_MAXHOST];
char clientport[NI_MAXSERV];
getnameinfo(addr, (AF_INET == m_iVersion) ? sizeof(sockaddr_in) : sizeof(sockaddr_in6), clienthost, sizeof(clienthost), clientport, sizeof(clientport), NI_NUMERICHOST|NI_NUMERICSERV);
int64_t timestamp = (CTimer::getTime() - m_StartTime) / 60000000; // secret changes every one minute
stringstream cookiestr;
cookiestr << clienthost << ":" << clientport << ":" << timestamp;
unsigned char cookie[16];
CMD5::compute(cookiestr.str().c_str(), cookie);
// connection request type: 1: regular connection request, 0: rendezvous connection request, -1/-2: response
if (1 == hs.m_iReqType)
{
hs.m_iCookie = *(int*)cookie;
packet.m_iID = hs.m_iID;
int size = packet.getLength();
hs.serialize(packet.m_pcData, size);
m_pSndQueue->sendto(addr, packet);
return 0;
}
else
{
if (hs.m_iCookie != *(int*)cookie)
{
timestamp --;
cookiestr << clienthost << ":" << clientport << ":" << timestamp;
CMD5::compute(cookiestr.str().c_str(), cookie);
if (hs.m_iCookie != *(int*)cookie)
return -1;
}
}
int32_t id = hs.m_iID;
// When a peer side connects in...
if ((1 == packet.getFlag()) && (0 == packet.getType()))
{ // 控制包,且当前为 Connection Handshake
if ((hs.m_iVersion != m_iVersion) || (hs.m_iType != m_iSockType))
{
// mismatch, reject the request
hs.m_iReqType = 1002;
int size = CHandShake::m_iContentSize;
hs.serialize(packet.m_pcData, size);
packet.m_iID = id;
m_pSndQueue->sendto(addr, packet);
}
else
{
int result = s_UDTUnited.newConnection(m_SocketID, addr, &hs);
if (result == -1)
hs.m_iReqType = 1002;
// send back a response if connection failed or connection already existed
// new connection response should be sent in connect()
if (result != 1)
{
int size = CHandShake::m_iContentSize;
hs.serialize(packet.m_pcData, size);
packet.m_iID = id;
m_pSndQueue->sendto(addr, packet);
}
else
{
// a new connection has been created, enable epoll for write
s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_OUT, true);
}
}
}
return hs.m_iReqType;
}
在上面的代码中,newConnection 主要作用是建立一个新的连接,实质上是将连接的对端信息加入到 UDT 中存放对端连接socket 记录的 m_PeerRec map,并在 m_Sockets 加入新建立的 UDT socket。
首先检查 这个连接是否已经建立,如果已经建立,则返回已经存在 UDT socket 信息;如果当前处于 BROKEN 状态,会修改状态到 CLOSED, 并进行一些清理工作。其余的清理流程在 UDT 关闭过程中处理。
如果是一个新的连接,新建 UDT socket, 初始化部分参数,包括 m_pSelfAddr, m_SocketID, m_ListenSocket,m_PeerID, m_iISN等,然后绑定新的地址到监听socket。修改状态为 CONNECTED。修改 m_PeerRec 与 m_Sockets, 插入socket 进入 m_pQueuedSockets, 更新本地节点信息, 更新事件与定时器。等待 accept 事件到来。
int CUDTUnited::newConnection(const UDTSOCKET listen, const sockaddr* peer, CHandShake* hs)
{
CUDTSocket* ns = NULL;
CUDTSocket* ls = locate(listen); //在 m_Sockets中查找本地 UDT socket。
if (NULL == ls)
return -1;
// if this connection has already been processed
if (NULL != (ns = locate(peer, hs->m_iID, hs->m_iISN))) //在 m_PeerRec 与 m_Sockets中查找。
{
if (ns->m_pUDT->m_bBroken)
{
// last connection from the "peer" address has been broken
...
}
else
{
// connection already exist, this is a repeated connection request
// respond with existing HS information
...
return 0;
//except for this situation a new connection should be started
}
}
// exceeding backlog, refuse the connection request
if (ls->m_pQueuedSockets->size() >= ls->m_uiBackLog)
return -1;
try
{
ns = new CUDTSocket;
ns->m_pUDT = new CUDT(*(ls->m_pUDT));
...
ns->m_pSelfAddr = ...
...
}
catch (...) { ... }
CGuard::enterCS(m_IDLock);
ns->m_SocketID = -- m_SocketID;
CGuard::leaveCS(m_IDLock);
ns->m_ListenSocket = listen;
ns->m_iIPversion = ls->m_iIPversion;
ns->m_pUDT->m_SocketID = ns->m_SocketID;
ns->m_PeerID = hs->m_iID;
ns->m_iISN = hs->m_iISN;
int error = 0;
try
{
// bind to the same addr of listening socket
ns->m_pUDT->open();
updateMux(ns, ls);
ns->m_pUDT->connect(peer, hs);
}
catch (...)
{
error = 1;
goto ERR_ROLLBACK;
}
ns->m_Status = CONNECTED;
// copy address information of local node
ns->m_pUDT->m_pSndQueue->m_pChannel->getSockAddr(ns->m_pSelfAddr);
CIPAddress::pton(ns->m_pSelfAddr, ns->m_pUDT->m_piSelfIP, ns->m_iIPversion);
// protect the m_Sockets structure.
CGuard::enterCS(m_ControlLock);
try
{
m_Sockets[ns->m_SocketID] = ns;
m_PeerRec[(ns->m_PeerID << 30) + ns->m_iISN].insert(ns->m_SocketID);
}
catch (...)
{
error = 2;
}
CGuard::leaveCS(m_ControlLock);
CGuard::enterCS(ls->m_AcceptLock);
try
{
ls->m_pQueuedSockets->insert(ns->m_SocketID);
}
catch (...)
{
error = 3;
}
CGuard::leaveCS(ls->m_AcceptLock);
// acknowledge users waiting for new connections on the listening socket
m_EPoll.update_events(listen, ls->m_pUDT->m_sPollID, UDT_EPOLL_IN, true);
CTimer::triggerEvent();
ERR_ROLLBACK:
if (error > 0)
{
ns->m_pUDT->close();
ns->m_Status = CLOSED;
ns->m_TimeStamp = CTimer::getTime();
return -1;
}
// wake up a waiting accept() call
#ifndef WIN32
pthread_mutex_lock(&(ls->m_AcceptLock));
pthread_cond_signal(&(ls->m_AcceptCond));
pthread_mutex_unlock(&(ls->m_AcceptLock));
#else
SetEvent(ls->m_AcceptCond);
#endif
return 1;
}
- CUDT::connect( api.cpp, CUDT::public method) -> CUDTUnited::connect -> CUDT::connect( core.cpp, CUDT::private method)
如果UDT socket 能够 connect,首先应该处于 INIT 或者 OPENED 状态。如果处于 INIT状态,表明为新创建的UDT Socket,需要初始化 m_pUDT 内参数并注册到复用器,修改状态为 OPENED。如果处于OPENED 状态,可能已经被 bind 过,则可以进入 CONNECTING 状态,并调用 m_pUDT->connect。记录对端地址 m_pPeerAddr到该 UDT socket内部结构。
int CUDTUnited::connect(const UDTSOCKET u, const sockaddr* name, int namelen)
{
CUDTSocket* s = locate(u);
CGuard cg(s->m_ControlLock);
// a socket can "connect" only if it is in INIT or OPENED status
if (INIT == s->m_Status)
{
if (!s->m_pUDT->m_bRendezvous)
{
s->m_pUDT->open();
updateMux(s);
s->m_Status = OPENED;
}
else
throw CUDTException(5, 8, 0);
}
else if (OPENED != s->m_Status)
throw CUDTException(5, 2, 0);
// connect_complete() may be called before connect() returns.
// So we need to update the status before connect() is called,
// otherwise the status may be overwritten with wrong value (CONNECTED vs. CONNECTING).
s->m_Status = CONNECTING;
try
{
s->m_pUDT->connect(name);
}
catch (CUDTException e)
{
s->m_Status = OPENED;
throw e;
}
// record peer address
delete s->m_pPeerAddr;
if (AF_INET == s->m_iIPversion)
{
s->m_pPeerAddr = (sockaddr*)(new sockaddr_in);
memcpy(s->m_pPeerAddr, name, sizeof(sockaddr_in));
}
else
{
s->m_pPeerAddr = (sockaddr*)(new sockaddr_in6);
memcpy(s->m_pPeerAddr, name, sizeof(sockaddr_in6));
}
return 0;
}
在 CUDT 中, 在private 方法中,connect 有三种形式:
// Functionality:
// Connect to a UDT entity listening at address "peer".
// Parameters:
// 0) [in] peer: The address of the listening UDT entity.
// Returned value:
// None.
void CUDT::connect(const sockaddr* serv_addr)
// Functionality:
// Process the response handshake packet.
// Parameters:
// 0) [in] pkt: handshake packet.
// Returned value:
// Return 0 if connected, positive value if connection is in progress, otherwise error code.
int CUDT::connect(const CPacket& response) throw ()
// Functionality:
// Connect to a UDT entity listening at address "peer", which has sent "hs" request.
// Parameters:
// 0) [in] peer: The address of the listening UDT entity.
// 1) [in/out] hs: The handshake information sent by the peer side (in), negotiated value (out).
// Returned value:
// None.
void CUDT::connect(const sockaddr* peer, CHandShake* hs)
在 connect 中调用的 connect 参数为 sockaddr,即第一种形式,这也是从外部接口 UDT::connect 调用后进入的函数:
void CUDT::connect(const sockaddr* serv_addr)
{
CGuard cg(m_ConnectionLock);
if (!m_bOpened) // UDT socket 处于 OPENED 状态
throw CUDTException(5, 0, 0);
if (m_bListening) //不能同时 listen 与 connect
throw CUDTException(5, 2, 0);
if (m_bConnecting || m_bConnected) //以前没有进行 connect 过
throw CUDTException(5, 2, 0);
m_bConnecting = true; //修改状态,防止被多次 connect
// record peer/server address
delete m_pPeerAddr;
m_pPeerAddr = (AF_INET == m_iIPversion) ? (sockaddr*)new sockaddr_in : (sockaddr*)new sockaddr_in6;
memcpy(m_pPeerAddr, serv_addr, (AF_INET == m_iIPversion) ? sizeof(sockaddr_in) : sizeof(sockaddr_in6));
// register this socket in the rendezvous queue
// RendezevousQueue is used to temporarily store incoming handshake, non-rendezvous connections also require this function
uint64_t ttl = 3000000; //这也是关闭时需要额外等3s的原因
if (m_bRendezvous)
ttl *= 10;
ttl += CTimer::getTime();
//将 UDT socket 插入一个链表 m_lRendezvousID,临时存储。不管是否汇合模式,都会保存。此处可能导致误解。
m_pRcvQueue->registerConnector(m_SocketID, this, m_iIPversion, serv_addr, ttl);
// This is my current configurations
m_ConnReq.m_iVersion = m_iVersion;
m_ConnReq.m_iType = m_iSockType;
m_ConnReq.m_iMSS = m_iMSS;
m_ConnReq.m_iFlightFlagSize = (m_iRcvBufSize < m_iFlightFlagSize)? m_iRcvBufSize : m_iFlightFlagSize;
m_ConnReq.m_iReqType = (!m_bRendezvous) ? 1 : 0;
m_ConnReq.m_iID = m_SocketID;
CIPAddress::ntop(serv_addr, m_ConnReq.m_piPeerIP, m_iIPversion);
// Random Initial Sequence Number
srand((unsigned int)CTimer::getTime());
m_iISN = m_ConnReq.m_iISN = (int32_t)(CSeqNo::m_iMaxSeqNo * (double(rand()) / RAND_MAX));
m_iLastDecSeq = m_iISN - 1;
m_iSndLastAck = m_iISN;
m_iSndLastDataAck = m_iISN;
m_iSndCurrSeqNo = m_iISN - 1;
m_iSndLastAck2 = m_iISN;
m_ullSndLastAck2Time = CTimer::getTime();
// Inform the server my configurations.
CPacket request;
char* reqdata = new char [m_iPayloadSize];
request.pack(0, NULL, reqdata, m_iPayloadSize); //建包
// ID = 0, connection request
request.m_iID = 0;
int hs_size = m_iPayloadSize;
m_ConnReq.serialize(reqdata, hs_size); //写入请求
request.setLength(hs_size);
m_pSndQueue->sendto(serv_addr, request); // 发送请求
m_llLastReqTime = CTimer::getTime(); //更新定时器
// asynchronous connect, return immediately
if (!m_bSynRecving)
{
delete [] reqdata;
return;
}
// Wait for the negotiated configurations from the peer side.
CPacket response;
char* resdata = new char [m_iPayloadSize];
response.pack(0, NULL, resdata, m_iPayloadSize);
CUDTException e(0, 0);
while (!m_bClosing) // 等待 connect 返回,最多等待3s, 如果没有响应,会重复发送请求
{
// avoid sending too many requests, at most 1 request per 250ms
if (CTimer::getTime() - m_llLastReqTime > 250000)
{
m_ConnReq.serialize(reqdata, hs_size);
request.setLength(hs_size);
if (m_bRendezvous)
request.m_iID = m_ConnRes.m_iID;
m_pSndQueue->sendto(serv_addr, request);
m_llLastReqTime = CTimer::getTime();
}
response.setLength(m_iPayloadSize);
if (m_pRcvQueue->recvfrom(m_SocketID, response) > 0)
{
if (connect(response) <= 0)
break;
// new request/response should be sent out immediately on receving a response
m_llLastReqTime = 0;
}
if (CTimer::getTime() > ttl)
{
// timeout
e = CUDTException(1, 1, 0);
break;
}
}
delete [] reqdata;
delete [] resdata;
if (e.getErrorCode() == 0)
{
if (m_bClosing) // if the socket is closed before connection...
e = CUDTException(1);
else if (1002 == m_ConnRes.m_iReqType) // connection request rejected
e = CUDTException(1, 2, 0);
else if ((!m_bRendezvous) && (m_iISN != m_ConnRes.m_iISN)) // secuity check
e = CUDTException(1, 4, 0);
}
if (e.getErrorCode() != 0)
throw e;
}
发出 connect 连接请求以后,如果属于同步方式,将等待返回,超时时间设置为3s,并会间隔250ms 不断发送请求,直至收到响应。接收到响应以后, connect(response), 即为 第二种 connect 方法。
建立连接的过程可以参考 TCP 的半连接的思想。这第二次的connect 实际上就是 第二个半连接的建立过程,也是最后一个协商过程。对于非汇合模式,从 m_lRendezvousID 中移除,重新配置所有的连接参数, 为UDT socket 建立对应的各种数据结构,包括 发送接收buffer,丢失链表,窗口等,这些是数据传输过程中需要使用的内部结构,服务于 UDT 的核心传输算法,包括拥塞避免,重传等。所以也会初始化拥塞控制相关参数,最后,设置当前状态 为已连接状态。通过 connect_complete 与 update_events 通知管理模块与epool 状态更新。
int CUDT::connect(const CPacket& response) throw ()
{
// this is the 2nd half of a connection request. If the connection is setup successfully this returns 0.
// returning -1 means there is an error.
// returning 1 or 2 means the connection is in process and needs more handshake
if (!m_bConnecting)
return -1;
if (m_bRendezvous && ((0 == response.getFlag()) || (1 == response.getType())) && (0 != m_ConnRes.m_iType))
{
//a data packet or a keep-alive packet comes, which means the peer side is already connected
// in this situation, the previously recorded response will be used
goto POST_CONNECT;
}
if ((1 != response.getFlag()) || (0 != response.getType()))
return -1;
m_ConnRes.deserialize(response.m_pcData, response.getLength());
if (m_bRendezvous)
{
// regular connect should NOT communicate with rendezvous connect
// rendezvous connect require 3-way handshake
if (1 == m_ConnRes.m_iReqType)
return -1;
if ((0 == m_ConnReq.m_iReqType) || (0 == m_ConnRes.m_iReqType))
{
m_ConnReq.m_iReqType = -1;
// the request time must be updated so that the next handshake can be sent out immediately.
m_llLastReqTime = 0;
return 1;
}
}
else
{
// set cookie
if (1 == m_ConnRes.m_iReqType)
{
m_ConnReq.m_iReqType = -1;
m_ConnReq.m_iCookie = m_ConnRes.m_iCookie;
m_llLastReqTime = 0;
return 1;
}
}
POST_CONNECT:
// Remove from rendezvous queue
m_pRcvQueue->removeConnector(m_SocketID);
// Re-configure according to the negotiated values.
m_iMSS = m_ConnRes.m_iMSS;
m_iFlowWindowSize = m_ConnRes.m_iFlightFlagSize;
m_iPktSize = m_iMSS - 28;
m_iPayloadSize = m_iPktSize - CPacket::m_iPktHdrSize;
m_iPeerISN = m_ConnRes.m_iISN;
m_iRcvLastAck = m_ConnRes.m_iISN;
m_iRcvLastAckAck = m_ConnRes.m_iISN;
m_iRcvCurrSeqNo = m_ConnRes.m_iISN - 1;
m_PeerID = m_ConnRes.m_iID;
memcpy(m_piSelfIP, m_ConnRes.m_piPeerIP, 16);
// Prepare all data structures
try
{
m_pSndBuffer = new CSndBuffer(32, m_iPayloadSize);
m_pRcvBuffer = new CRcvBuffer(&(m_pRcvQueue->m_UnitQueue), m_iRcvBufSize);
// after introducing lite ACK, the sndlosslist may not be cleared in time, so it requires twice space.
m_pSndLossList = new CSndLossList(m_iFlowWindowSize * 2);
m_pRcvLossList = new CRcvLossList(m_iFlightFlagSize);
m_pACKWindow = new CACKWindow(1024);
m_pRcvTimeWindow = new CPktTimeWindow(16, 64);
m_pSndTimeWindow = new CPktTimeWindow();
}
catch (...)
{
throw CUDTException(3, 2, 0);
}
CInfoBlock ib;
ib.m_iIPversion = m_iIPversion;
CInfoBlock::convert(m_pPeerAddr, m_iIPversion, ib.m_piIP);
if (m_pCache->lookup(&ib) >= 0)
{
m_iRTT = ib.m_iRTT;
m_iBandwidth = ib.m_iBandwidth;
}
m_pCC = m_pCCFactory->create();
m_pCC->m_UDT = m_SocketID;
m_pCC->setMSS(m_iMSS);
m_pCC->setMaxCWndSize(m_iFlowWindowSize);
m_pCC->setSndCurrSeqNo(m_iSndCurrSeqNo);
m_pCC->setRcvRate(m_iDeliveryRate);
m_pCC->setRTT(m_iRTT);
m_pCC->setBandwidth(m_iBandwidth);
m_pCC->init();
m_ullInterval = (uint64_t)(m_pCC->m_dPktSndPeriod * m_ullCPUFrequency);
m_dCongestionWindow = m_pCC->m_dCWndSize;
// And, I am connected too.
m_bConnecting = false;
m_bConnected = true;
// register this socket for receiving data packets
m_pRNode->m_bOnList = true;
m_pRcvQueue->setNewEntry(this);
// acknowledge the management module.
s_UDTUnited.connect_complete(m_SocketID); // 更新m_pSndQueue->m_pChannel本地节点信息,设置状态为 CONNECTED
// acknowledde any waiting epolls to write
s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_OUT, true);
return 0;
}
- UDT::accept -> CUDT::accept -> CUDTUnited::accept
UDT accept 与 TCP socket 中 的accept一样,在socket bind 以后就可以使用,等待其他连接到来。所以当前的 UDT Listener 的状态为 LISTENING。 仅在非汇合模式下使用。
主要的过程是一个while 循环,等待accept事件。当到来以后,删除 m_pQueuedSockets中的节点,插入 m_pAcceptSockets。update_events 发给 epool事件更新。存储对端地址。
UDTSOCKET CUDTUnited::accept(const UDTSOCKET listen, sockaddr* addr, int* addrlen)
{
CUDTSocket* ls = locate(listen);
// the "listen" socket must be in LISTENING status
if (LISTENING != ls->m_Status)
throw CUDTException(5, 6, 0);
// no "accept" in rendezvous connection setup
if (ls->m_pUDT->m_bRendezvous)
throw CUDTException(5, 7, 0);
UDTSOCKET u = CUDT::INVALID_SOCK;
bool accepted = false;
// !!only one conection can be set up each time!!
#ifndef WIN32
while (!accepted) //循环等待连接到来 accepted = true 时退出
{
pthread_mutex_lock(&(ls->m_AcceptLock));
if ((LISTENING != ls->m_Status) || ls->m_pUDT->m_bBroken)
{
// This socket has been closed.
accepted = true;
}
else if (ls->m_pQueuedSockets->size() > 0)
{ //更新 m_pAcceptSockets 和 m_pQueuedSockets
u = *(ls->m_pQueuedSockets->begin());
ls->m_pAcceptSockets->insert(ls->m_pAcceptSockets->end(), u);
ls->m_pQueuedSockets->erase(ls->m_pQueuedSockets->begin());
accepted = true;
}
else if (!ls->m_pUDT->m_bSynRecving)
{
accepted = true;
}
if (!accepted && (LISTENING == ls->m_Status))
pthread_cond_wait(&(ls->m_AcceptCond), &(ls->m_AcceptLock));
if (ls->m_pQueuedSockets->empty())
m_EPoll.update_events(listen, ls->m_pUDT->m_sPollID, UDT_EPOLL_IN, false);
pthread_mutex_unlock(&(ls->m_AcceptLock));
}
#else
while (!accepted)
{
WaitForSingleObject(ls->m_AcceptLock, INFINITE);
if (ls->m_pQueuedSockets->size() > 0)
{
u = *(ls->m_pQueuedSockets->begin());
ls->m_pAcceptSockets->insert(ls->m_pAcceptSockets->end(), u);
ls->m_pQueuedSockets->erase(ls->m_pQueuedSockets->begin());
accepted = true;
}
else if (!ls->m_pUDT->m_bSynRecving)
accepted = true;
ReleaseMutex(ls->m_AcceptLock);
if (!accepted & (LISTENING == ls->m_Status))
WaitForSingleObject(ls->m_AcceptCond, INFINITE);
if ((LISTENING != ls->m_Status) || ls->m_pUDT->m_bBroken)
{
// Send signal to other threads that are waiting to accept.
SetEvent(ls->m_AcceptCond);
accepted = true;
}
if (ls->m_pQueuedSockets->empty())
m_EPoll.update_events(listen, ls->m_pUDT->m_sPollID, UDT_EPOLL_IN, false);
}
#endif
if (u == CUDT::INVALID_SOCK)
{
// non-blocking receiving, no connection available
if (!ls->m_pUDT->m_bSynRecving)
throw CUDTException(6, 2, 0);
// listening socket is closed
throw CUDTException(5, 6, 0);
}
// 存储对端的地址
if ((addr != NULL) && (addrlen != NULL))
{
if (AF_INET == locate(u)->m_iIPversion)
*addrlen = sizeof(sockaddr_in);
else
*addrlen = sizeof(sockaddr_in6);
// copy address information of peer node
memcpy(addr, locate(u)->m_pPeerAddr, *addrlen);
}
return u;
}
UDT支持两种连接模式:C/S 模式和汇合模式。UDT client 发送一个握手消息(type为 0 的控制报文)给 server 或者 peer。消息携带信息格式见文章 UDT最新协议分析 。
如果一个UDT socket 作为server,会建立一个UDT实体, 并作为 Listener 监听绑定的端口,当有新的连接请求到来时,就会新创建一个 UDT socket,并初始化相关信息,并将新的 UDT socket 相关的信息写入到 Listener。这就是一个连接的建立过程,和TCP的连接过程比较相似。
- 当 UDT client 要对一个UDT server建立连接的时候,会在3s内每间隔 250ms 连续发送握手报文,直到收到server反馈回来的握手的报文或者连接超时。
- 当 UDT server 第一次接收到来自 UDT client 的握手连接请求的时候,它会根据 client 的 address 和一个 secret key 产生一个 cookie 值,然后发送给 client。
- 当 UDT client 收到回应以后,必须把收到的 cookie 再返回发送给 server。
- 当 UDT server 接收到一个握手报文和正确的 cookie 时,协商包大小与最大窗口,并把协商结果发送给 client。
- UDT server 将握手报文中的 packet size 和 maximum window size 信息提取出来,并同 server 端自己的 packet size 和 maximum window size信息相比较,将较小的 packet size 和 maximum window size 信息赋值给自己。
- UDT server 把包大小与最大窗口等结果发送给client端,并携带上 server 的版本号和初始序列号。为防止丢包,如果后续还接收到同一对端其他握手消息时,仍需要继续发送响应。
- UDT server 准备接收发送数据。
- UDT client 收到 server 发送的握手包,开始发送接收数据,如果还有其他握手消息,不再回应。
汇合模式下,两端均为客户端,需要两端同时调用udt::connect, 主要用于NAT穿透的情况。
- 两个UDT client 同时向对方发起连接请求,发送握手包。连接类型初始值为0。
- UDT client 收到对端发送的连接请求后,检查连接类型
- 如果连接类型为0,那么响应报文中会被设置成-1。
- 如果连接类型为-1,那么响应报文中会被设置成-2。
- 如果连接类型为-2,那么将不会有任何反馈信息。