UDT本身提供了一些测试案例,位于UDT/app文件中,如appclient.cpp/appserver.cpp等。本节以appclient.cpp/appserver.cpp为例,介绍下UDT的操作以及client/server初始化流程。本节主要介绍了初始阶段UDT::startup、UDT::cleanup、UDT::socket、UDT::bind、UDT::listen这几个套接口。关于UDT::connect和UDT::accpet放到握手部分来讲解。
#ifndef WIN32
#include <unistd.h>
#include <cstdlib>
#include <cstring>
#include <netdb.h>
#else
#include <winsock2.h>
#include <ws2tcpip.h>
#include <wspiapi.h>
#endif
#include <iostream>
#include <udt.h>
#include "cc.h"
#include "test_util.h"
using namespace std;
#ifndef WIN32
void* monitor(void*);
#else
DWORD WINAPI monitor(LPVOID);
#endif
int main(int argc, char* argv[])
{
if ((3 != argc) || (0 == atoi(argv[2])))
{
cout << "usage: appclient server_ip server_port" << endl;
return 0;
}
// Automatically start up and clean up UDT module.
UDTUpDown _udt_;//用RAII手法进行资源管理,其相关定义位于test_util.h中。
struct addrinfo hints, *local, *peer;
memset(&hints, 0, sizeof(struct addrinfo));
hints.ai_flags = AI_PASSIVE;
hints.ai_family = AF_INET;
hints.ai_socktype = SOCK_STREAM;
//hints.ai_socktype = SOCK_DGRAM;
if (0 != getaddrinfo(NULL, "9000", &hints, &local))
{
cout << "incorrect network address.\n" << endl;
return 0;
}
UDTSOCKET client = UDT::socket(local->ai_family, local->ai_socktype, local->ai_protocol);//建立UDT 套接口,和TCP的一样
// UDT Options
//UDT::setsockopt(client, 0, UDT_CC, new CCCFactory<CUDPBlast>, sizeof(CCCFactory<CUDPBlast>));
//UDT::setsockopt(client, 0, UDT_MSS, new int(9000), sizeof(int));
//UDT::setsockopt(client, 0, UDT_SNDBUF, new int(10000000), sizeof(int));
//UDT::setsockopt(client, 0, UDP_SNDBUF, new int(10000000), sizeof(int));
//UDT::setsockopt(client, 0, UDT_MAXBW, new int64_t(12500000), sizeof(int));
// Windows UDP issue
// For better performance, modify HKLM\System\CurrentControlSet\Services\Afd\Parameters\FastSendDatagramThreshold
#ifdef WIN32
UDT::setsockopt(client, 0, UDT_MSS, new int(1052), sizeof(int));
#endif
// for rendezvous connection, enable the code below//采用汇合连接时,需要进行的设置。汇合连接相关部分,详见UDT连接分析。
/*
UDT::setsockopt(client, 0, UDT_RENDEZVOUS, new bool(true), sizeof(bool));
if (UDT::ERROR == UDT::bind(client, local->ai_addr, local->ai_addrlen))
{
cout << "bind: " << UDT::getlasterror().getErrorMessage() << endl;
return 0;
}
*/
freeaddrinfo(local);
if (0 != getaddrinfo(argv[1], argv[2], &hints, &peer))
{
cout << "incorrect server/peer address. " << argv[1] << ":" << argv[2] << endl;
return 0;
}
// connect to the server, implict bind
if (UDT::ERROR == UDT::connect(client, peer->ai_addr, peer->ai_addrlen))//建立连接,其相关协议和源码分析,详见UDT连接分析。
{
cout << "connect: " << UDT::getlasterror().getErrorMessage() << endl;
return 0;
}
freeaddrinfo(peer);
// using CC method
//CUDPBlast* cchandle = NULL;
//int temp;
//UDT::getsockopt(client, 0, UDT_CC, &cchandle, &temp);
//if (NULL != cchandle)
// cchandle->setRate(500);
int size = 100000;
char* data = new char[size];
#ifndef WIN32
pthread_create(new pthread_t, NULL, monitor, &client);
#else
CreateThread(NULL, 0, monitor, &client, 0, NULL);
#endif
for (int i = 0; i < 1000000; i ++)
{
int ssize = 0;
int ss;
while (ssize < size)
{
if (UDT::ERROR == (ss = UDT::send(client, data + ssize, size - ssize, 0)))//发送数据
{
cout << "send:" << UDT::getlasterror().getErrorMessage() << endl;
break;
}
ssize += ss;
}
if (ssize < size)
break;
}
UDT::close(client);//关闭UDT连接
delete [] data;
return 0;
}
#ifndef WIN32
void* monitor(void* s)//性能监视程序
#else
DWORD WINAPI monitor(LPVOID s)
#endif
{
UDTSOCKET u = *(UDTSOCKET*)s;
UDT::TRACEINFO perf;
cout << "SendRate(Mb/s)\tRTT(ms)\tCWnd\tPktSndPeriod(us)\tRecvACK\tRecvNAK" << endl;
while (true)
{
#ifndef WIN32
sleep(1);
#else
Sleep(1000);
#endif
if (UDT::ERROR == UDT::perfmon(u, &perf))
{
cout << "perfmon: " << UDT::getlasterror().getErrorMessage() << endl;
break;
}
cout << perf.mbpsSendRate << "\t\t"
<< perf.msRTT << "\t"
<< perf.pktCongestionWindow << "\t"
<< perf.usPktSndPeriod << "\t\t\t"
<< perf.pktRecvACK << "\t"
<< perf.pktRecvNAK << endl;
}
#ifndef WIN32
return NULL;
#else
return 0;
#endif
}
#ifndef WIN32
#include <unistd.h>
#include <cstdlib>
#include <cstring>
#include <netdb.h>
#else
#include <winsock2.h>
#include <ws2tcpip.h>
#include <wspiapi.h>
#endif
#include <iostream>
#include <udt.h>
#include "cc.h"
#include "test_util.h"
using namespace std;
#ifndef WIN32
void* recvdata(void*);
#else
DWORD WINAPI recvdata(LPVOID);
#endif
int main(int argc, char* argv[])
{
if ((1 != argc) && ((2 != argc) || (0 == atoi(argv[1]))))
{
cout << "usage: appserver [server_port]" << endl;
return 0;
}
// Automatically start up and clean up UDT module.
UDTUpDown _udt_;//用RAII手法进行资源管理,其相关定义位于test_util.h中。
addrinfo hints;
addrinfo* res;
memset(&hints, 0, sizeof(struct addrinfo));
hints.ai_flags = AI_PASSIVE;
hints.ai_family = AF_INET;
hints.ai_socktype = SOCK_STREAM;
//hints.ai_socktype = SOCK_DGRAM;
string service("9000");
if (2 == argc)
service = argv[1];
if (0 != getaddrinfo(NULL, service.c_str(), &hints, &res))
{
cout << "illegal port number or port is busy.\n" << endl;
return 0;
}
UDTSOCKET serv = UDT::socket(res->ai_family, res->ai_socktype, res->ai_protocol);//建立UDT 套接口
// UDT Options
//UDT::setsockopt(serv, 0, UDT_CC, new CCCFactory<CUDPBlast>, sizeof(CCCFactory<CUDPBlast>));
//UDT::setsockopt(serv, 0, UDT_MSS, new int(9000), sizeof(int));
//UDT::setsockopt(serv, 0, UDT_RCVBUF, new int(10000000), sizeof(int));
//UDT::setsockopt(serv, 0, UDP_RCVBUF, new int(10000000), sizeof(int));
if (UDT::ERROR == UDT::bind(serv, res->ai_addr, res->ai_addrlen))//绑定套接口 和 IP
{
cout << "bind: " << UDT::getlasterror().getErrorMessage() << endl;
return 0;
}
freeaddrinfo(res);
cout << "server is ready at port: " << service << endl;
if (UDT::ERROR == UDT::listen(serv, 10))
{
cout << "listen: " << UDT::getlasterror().getErrorMessage() << endl;
return 0;
}
sockaddr_storage clientaddr;
int addrlen = sizeof(clientaddr);
UDTSOCKET recver;
while (true)
{
if (UDT::INVALID_SOCK == (recver = UDT::accept(serv, (sockaddr*)&clientaddr, &addrlen)))//准备建立连接了
{
cout << "accept: " << UDT::getlasterror().getErrorMessage() << endl;
return 0;
}
char clienthost[NI_MAXHOST];
char clientservice[NI_MAXSERV];
getnameinfo((sockaddr *)&clientaddr, addrlen, clienthost, sizeof(clienthost), clientservice, sizeof(clientservice), NI_NUMERICHOST|NI_NUMERICSERV);
cout << "new connection: " << clienthost << ":" << clientservice << endl;
#ifndef WIN32
pthread_t rcvthread;
pthread_create(&rcvthread, NULL, recvdata, new UDTSOCKET(recver));
pthread_detach(rcvthread);
#else
CreateThread(NULL, 0, recvdata, new UDTSOCKET(recver), 0, NULL);
#endif
}
UDT::close(serv);
return 0;
}
#ifndef WIN32
void* recvdata(void* usocket)
#else
DWORD WINAPI recvdata(LPVOID usocket)
#endif
{
UDTSOCKET recver = *(UDTSOCKET*)usocket;
delete (UDTSOCKET*)usocket;
char* data;
int size = 100000;
data = new char[size];
while (true)
{
int rsize = 0;
int rs;
while (rsize < size)
{
if (UDT::ERROR == (rs = UDT::recv(recver, data + rsize, size - rsize, 0)))//接收数据
{
cout << "recv:" << UDT::getlasterror().getErrorMessage() << endl;
break;
}
rsize += rs;
}
if (rsize < size)
break;
}
delete [] data;
UDT::close(recver);//断开连接
#ifndef WIN32
return NULL;
#else
return 0;
#endif
}
初始化操作之UDTUpDown _udt_分析,它的声明位于test_util.h文件中:
#ifndef _UDT_TEST_UTIL_H_
#define _UDT_TEST_UTIL_H_
struct UDTUpDown{
UDTUpDown()
{
// use this function to initialize the UDT library
UDT::startup();//初始化UDT库
}
~UDTUpDown()
{
// use this function to release the UDT library
UDT::cleanup();//释放UDT库
}
};
#endif
namespace UDT//在udt.h中声明,api.cpp中定义
{
int startup()
{
return CUDT::startup();//CUDT在core.h中定义;CUDT在core.cpp和api.cpp中实现
//CUDT的非static成员函数(private)是在是在core.cpp中实现
//CUDT的static成员函数(public)是在api.cpp中实现的,即同namespace UDT在同一文件中
}
int cleanup()
{
return CUDT::cleanup();
}
......
}
int CUDT::startup()//core.h中定义,api.cpp中实现
{
return s_UDTUnited.startup();//UDT global management base,s_UDTUnited为static成员变量
}
int CUDT::cleanup()
{
return s_UDTUnited.cleanup();
}
s_UDTUnited.startup()函数主要功能是开启垃圾回收线程。另外,不论上层调用几次startup函数,程序中只有一个垃圾线程,代码如下:
int CUDTUnited::startup()
{
CGuard gcinit(m_InitLock);
if (m_iInstanceCount++ > 0)//m_iInstanceCount初始值为0,++在后等于++在前的值,则表达式的值为0,也就是第一次调用startup的时候,会跳过这个if语句中的return 0,但是,之后调用startup的时候,则会返回return 0。
return 0;
// Global initialization code
#ifdef WIN32
WORD wVersionRequested;
WSADATA wsaData;
wVersionRequested = MAKEWORD(2, 2);
if (0 != WSAStartup(wVersionRequested, &wsaData))
throw CUDTException(1, 0, WSAGetLastError());
#endif
//init CTimer::EventLock
if (m_bGCStatus)//默认为false,后面会赋值为true;不论startup被调用多少次,垃圾回收线程只会启动一次。
return true;
m_bClosing = false;
#ifndef WIN32
pthread_mutex_init(&m_GCStopLock, NULL);
pthread_cond_init(&m_GCStopCond, NULL);
pthread_create(&m_GCThread, NULL, garbageCollect, this);//用于垃圾回收
#else
m_GCStopLock = CreateMutex(NULL, false, NULL);
m_GCStopCond = CreateEvent(NULL, false, false, NULL);
DWORD ThreadID;
m_GCThread = CreateThread(NULL, 0, garbageCollect, this, 0, &ThreadID);
#endif
m_bGCStatus = true;//设置垃圾回收线程的状态,防止垃圾回收线程被启动多次。
return 0;
}
垃圾回收函数garbageCollect函数如下:
#ifndef WIN32
void* CUDTUnited::garbageCollect(void* p)
#else
DWORD WINAPI CUDTUnited::garbageCollect(LPVOID p)
#endif
{
CUDTUnited* self = (CUDTUnited*)p;
CGuard gcguard(self->m_GCStopLock);
while (!self->m_bClosing)
{
self->checkBrokenSockets();
#ifdef WIN32
self->checkTLSValue();
#endif
#ifndef WIN32
timeval now;
timespec timeout;
gettimeofday(&now, 0);
timeout.tv_sec = now.tv_sec + 1;
timeout.tv_nsec = now.tv_usec * 1000;
pthread_cond_timedwait(&self->m_GCStopCond, &self->m_GCStopLock, &timeout);//<span style="color:#ff0000;">1s多循环一次这样做CPU利用率会很高的</span>
#else
WaitForSingleObject(self->m_GCStopCond, 1000);
#endif
}
// remove all sockets and multiplexers//删除所有套接字和复用器
......
}
s_UDTUnited.cleanup()函数的主要功能是signal 垃圾回收线程,回收资源。在这里织的注意的是,如果m_iInstanceCount=1是不会促发signal的。代码如下:
int CUDTUnited::cleanup()
{
CGuard gcinit(m_InitLock);
if (--m_iInstanceCount > 0)//
return 0;
//destroy CTimer::EventLock
if (!m_bGCStatus)
return 0;
m_bClosing = true;
#ifndef WIN32
pthread_cond_signal(&m_GCStopCond);
pthread_join(m_GCThread, NULL);
pthread_mutex_destroy(&m_GCStopLock);
pthread_cond_destroy(&m_GCStopCond);
#else
SetEvent(m_GCStopCond);
WaitForSingleObject(m_GCThread, INFINITE);
CloseHandle(m_GCThread);
CloseHandle(m_GCStopLock);
CloseHandle(m_GCStopCond);
#endif
m_bGCStatus = false;
// Global destruction code
#ifdef WIN32
WSACleanup();
#endif
return 0;
}
UDTSOCKET socket(int af, int type, int protocol)//protocol传进去,没啥用
{
return CUDT::socket(af, type, protocol);
}
UDTSOCKET CUDT::socket(int af, int type, int)
{
if (!s_UDTUnited.m_bGCStatus)
s_UDTUnited.startup();//防止前面的startup没有开启,
try
{
return s_UDTUnited.newSocket(af, type);
}
......
}
UDTSOCKET CUDTUnited::newSocket(int af, int type)
{
if ((type != SOCK_STREAM) && (type != SOCK_DGRAM))
throw CUDTException(5, 3, 0);
CUDTSocket* ns = NULL;//声明并定义一个UDT 套接口(套接口)
try//可以看出,在调用关系上,UDT-->CUDTUnited-->CUDTSocket--->CUDT
{
ns = new CUDTSocket;//每次上层调用一次UDT::socket,下层就会new一次,这里的ns是关键,创建一个新的套接口
ns->m_pUDT = new CUDT;//创建一个新的 UDT 传输控制块
if (AF_INET == af)
{
ns->m_pSelfAddr = (sockaddr*)(new sockaddr_in);//创建存储自身地址的空间
((sockaddr_in*)(ns->m_pSelfAddr))->sin_port = 0;
}
else
{
ns->m_pSelfAddr = (sockaddr*)(new sockaddr_in6);
((sockaddr_in6*)(ns->m_pSelfAddr))->sin6_port = 0;
}
}
catch (...)
{
delete ns;
throw CUDTException(3, 2, 0);
}
CGuard::enterCS(m_IDLock);//提供多种类型的锁,有以对象为管理资源的,有手动管理和释放资源的,根据情况减小锁的粒度
ns->m_SocketID = -- m_SocketID;//创建ns对象时,在构造函数中已完成初始化
CGuard::leaveCS(m_IDLock);
//完成UDT套接口ns和传输控制块m_pUDT相关变量的初始化
ns->m_Status = INIT;
ns->m_ListenSocket = 0;
ns->m_pUDT->m_SocketID = ns->m_SocketID;
ns->m_pUDT->m_iSockType = (SOCK_STREAM == type) ? UDT_STREAM : UDT_DGRAM;
ns->m_pUDT->m_iIPversion = ns->m_iIPversion = af;
ns->m_pUDT->m_pCache = m_pCache;//在CUDT构造函数中,m_pCache为NULL
// protect the m_Sockets structure.
CGuard::enterCS(m_ControlLock);
try
{
m_Sockets[ns->m_SocketID] = ns;//std::map<UDTSOCKET, CUDTSocket*> m_Sockets;UDT套接口ID和 UDT套接口相关联
}
catch (...)
{
//failure and rollback
CGuard::leaveCS(m_ControlLock);
delete ns;
ns = NULL;//删除后一定要赋值为NULL
}
CGuard::leaveCS(m_ControlLock);
if (NULL == ns)
throw CUDTException(3, 2, 0);
return ns->m_SocketID;
}
namespace UDT
{
......
int bind(UDTSOCKET u, const struct sockaddr* name, int namelen)
{
return CUDT::bind(u, name, namelen);
}
......
}
int CUDT::bind(UDTSOCKET u, const sockaddr* name, int namelen)
{
try
{
return s_UDTUnited.bind(u, name, namelen);
}
......
}
int CUDTUnited::bind(const UDTSOCKET u, const sockaddr* name, int namelen)
{
CUDTSocket* s = locate(u);//查找套接口ID所对应的UDT套接口
if (NULL == s)
throw CUDTException(5, 4, 0);
CGuard cg(s->m_ControlLock);//以对象的方式管理资源
// cannot bind a socket more than once
if (INIT != s->m_Status)//套接口状态在CUDTSocket构造函数中初始为INIT
throw CUDTException(5, 0, 0);
// check the size of SOCKADDR structure
if (AF_INET == s->m_iIPversion)
{
if (namelen != sizeof(sockaddr_in))
throw CUDTException(5, 3, 0);
}
else
{
if (namelen != sizeof(sockaddr_in6))
throw CUDTException(5, 3, 0);
}
s->m_pUDT->open();//CUDTUnited和CUDT友元关系,所以可以访问private成员函数,在open()完成传输控制块相关参数的初始化,相关定时器初始化
updateMux(s, name);
s->m_Status = OPENED;
// copy address information of local node
s->m_pUDT->m_pSndQueue->m_pChannel->getSockAddr(s->m_pSelfAddr);
return 0;
}
void CUDT::open()//core.cpp
{
CGuard cg(m_ConnectionLock);//嵌套锁哟
// Initial sequence number, loss, acknowledgement, etc.
m_iPktSize = m_iMSS - 28;//1500-28(20+8)=1472
m_iPayloadSize = m_iPktSize - CPacket::m_iPktHdrSize;//1472-16(packet.cpp)=1456
m_iEXPCount = 1;
m_iBandwidth = 1;
m_iDeliveryRate = 16;//包在接收端到达的速率
m_iAckSeqNo = 0;
m_ullLastAckTime = 0;
// trace information
m_StartTime = CTimer::getTime();
m_llSentTotal = m_llRecvTotal = m_iSndLossTotal = m_iRcvLossTotal = m_iRetransTotal = m_iSentACKTotal = m_iRecvACKTotal = m_iSentNAKTotal = m_iRecvNAKTotal = 0;
m_LastSampleTime = CTimer::getTime();
m_llTraceSent = m_llTraceRecv = m_iTraceSndLoss = m_iTraceRcvLoss = m_iTraceRetrans = m_iSentACK = m_iRecvACK = m_iSentNAK = m_iRecvNAK = 0;
m_llSndDuration = m_llSndDurationTotal = 0;
// structures for queue 接收队列 发送队列的初始化
if (NULL == m_pSNode)
m_pSNode = new CSNode;
m_pSNode->m_pUDT = this;
m_pSNode->m_llTimeStamp = 1;
m_pSNode->m_iHeapLoc = -1;
if (NULL == m_pRNode)
m_pRNode = new CRNode;
m_pRNode->m_pUDT = this;
m_pRNode->m_llTimeStamp = 1;
m_pRNode->m_pPrev = m_pRNode->m_pNext = NULL;
m_pRNode->m_bOnList = false;
m_iRTT = 10 * m_iSYNInterval;//10*10000=100000(100ms)
m_iRTTVar = m_iRTT >> 1;
m_ullCPUFrequency = CTimer::getCPUFrequency();
// set up the timers
m_ullSYNInt = m_iSYNInterval * m_ullCPUFrequency;//10ms
// set minimum NAK and EXP timeout to 100ms
m_ullMinNakInt = 300000 * m_ullCPUFrequency;//300ms
m_ullMinExpInt = 300000 * m_ullCPUFrequency;//300ms
m_ullACKInt = m_ullSYNInt;//10ms
m_ullNAKInt = m_ullMinNakInt;//300ms
uint64_t currtime;
CTimer::rdtsc(currtime);
m_ullLastRspTime = currtime;
m_ullNextACKTime = currtime + m_ullSYNInt;
m_ullNextNAKTime = currtime + m_ullNAKInt;
m_iPktCount = 0;
m_iLightACKCount = 1;
m_ullTargetTime = 0;
m_ullTimeDiff = 0;
// Now UDT is opened.
m_bOpened = true;//改变传输控制块的状态
}
void CUDTUnited::updateMux(CUDTSocket* s, const sockaddr* addr, const UDPSOCKET* udpsock)//后面两个参数默认为NULL
{
CGuard cg(m_ControlLock);
if ((s->m_pUDT->m_bReuseAddr) && (NULL != addr))//m_bReuseAddr初始值为true //reuse an exiting port or not, for udp multiplexer
{
int port = (AF_INET == s->m_pUDT->m_iIPversion) ? ntohs(((sockaddr_in*)addr)->sin_port) : ntohs(((sockaddr_in6*)addr)->sin6_port);
// find a reusable address
for (map<int, CMultiplexer>::iterator i = m_mMultiplexer.begin(); i != m_mMultiplexer.end(); ++ i)
{
if ((i->second.m_iIPversion == s->m_pUDT->m_iIPversion) && (i->second.m_iMSS == s->m_pUDT->m_iMSS) && i->second.m_bReusable)
{
if (i->second.m_iPort == port)//找到了就return结束了
{
// reuse the existing multiplexer
++ i->second.m_iRefCount;//与这个multiplexer关联的传输控制块的个数
s->m_pUDT->m_pSndQueue = i->second.m_pSndQueue;//端口复用后,不用创建新的发送线程/接收线程
s->m_pUDT->m_pRcvQueue = i->second.m_pRcvQueue;
s->m_iMuxID = i->second.m_iID;
return;
}
}
}
}
// a new multiplexer is needed
CMultiplexer m;//建立一个新的multiplexer
m.m_iMSS = s->m_pUDT->m_iMSS;
m.m_iIPversion = s->m_pUDT->m_iIPversion;
m.m_iRefCount = 1;
m.m_bReusable = s->m_pUDT->m_bReuseAddr;//bool 默认值是true
m.m_iID = s->m_SocketID;
m.m_pChannel = new CChannel(s->m_pUDT->m_iIPversion);//创建一个channel对象
m.m_pChannel->setSndBufSize(s->m_pUDT->m_iUDPSndBufSize);
m.m_pChannel->setRcvBufSize(s->m_pUDT->m_iUDPRcvBufSize);
try
{
if (NULL != udpsock)
m.m_pChannel->open(*udpsock);
else
m.m_pChannel->open(addr);//默认执行这个函数,在open函数中 建立UDP socket,bind socket,并对UDP的接收/发送缓存的大小进行设置
}
catch (CUDTException& e)
{
m.m_pChannel->close();//发生错误后,则关闭套接口
delete m.m_pChannel;
throw e;
}
sockaddr* sa = (AF_INET == s->m_pUDT->m_iIPversion) ? (sockaddr*) new sockaddr_in : (sockaddr*) new sockaddr_in6;
m.m_pChannel->getSockAddr(sa);
m.m_iPort = (AF_INET == s->m_pUDT->m_iIPversion) ? ntohs(((sockaddr_in*)sa)->sin_port) : ntohs(((sockaddr_in6*)sa)->sin6_port);将port存进来,如果下次有其它连接bind该端口时,可以在前面的if语句中进行port复用
if (AF_INET == s->m_pUDT->m_iIPversion) delete (sockaddr_in*)sa; else delete (sockaddr_in6*)sa;
m.m_pTimer = new CTimer;//新建定时器对象
m.m_pSndQueue = new CSndQueue;//新建发送队列对象
m.m_pSndQueue->init(m.m_pChannel, m.m_pTimer);//创建发送队列的工作线程
m.m_pRcvQueue = new CRcvQueue;//新建接收队列对象
m.m_pRcvQueue->init(32, s->m_pUDT->m_iPayloadSize, m.m_iIPversion, 1024, m.m_pChannel, m.m_pTimer);//创建接收队列的工作线程
m_mMultiplexer[m.m_iID] = m;//将套接口ID和复用器绑定在一起
s->m_pUDT->m_pSndQueue = m.m_pSndQueue;
s->m_pUDT->m_pRcvQueue = m.m_pRcvQueue;
s->m_iMuxID = m.m_iID;//套接口ID 和 复用器ID是一样的
}
namespace UDT
{
.....
int listen(UDTSOCKET u, int backlog)
{
return CUDT::listen(u, backlog);
}
......
}
int CUDT::listen(UDTSOCKET u, int backlog)
{
try
{
return s_UDTUnited.listen(u, backlog);
}
......
}
int CUDTUnited::listen(const UDTSOCKET u, int backlog)
{
......
s->m_uiBackLog = backlog;
try
{
s->m_pQueuedSockets = new set<UDTSOCKET>;// set of connections waiting for accept()
s->m_pAcceptSockets = new set<UDTSOCKET>; // set of accept()ed connections
}
catch (...)
{
delete s->m_pQueuedSockets;
delete s->m_pAcceptSockets;
throw CUDTException(3, 2, 0);
}
s->m_pUDT->listen();
s->m_Status = LISTENING;
return 0;
}
void CUDT::listen()
{
CGuard cg(m_ConnectionLock);
if (!m_bOpened)
throw CUDTException(5, 0, 0);
if (m_bConnecting || m_bConnected)
throw CUDTException(5, 2, 0);
// listen can be called more than once
if (m_bListening)
return;
// if there is already another socket listening on the same port
if (m_pRcvQueue->setListener(this) < 0)//判断是否有其它传输控制块在监此端口号
throw CUDTException(5, 11, 0);
m_bListening = true;
}