在前一篇文章中描述 UDT socket 的最前面提到了 UDT epoll,但是并没有分析其源码。在这篇文章中将集中分析 UDT 中的网络I/O复用的源码,看看与传统的 select 和 epoll 是否有什么不同。
I/O 复用的文章与例子很多,有时间可以再写介绍。常见的I/O多路复用技术有 select 和 epoll,还有 windows 中常用的 IOCP,它们的的出现的唯一目的就是解决大量I/O操作时的阻塞问题,使得可以在 I/O 可用时及时得到通知,例如服务器上大量并发连接进行网络通信场景。
Select 的问题有两点,一是连接具有数量上限,二是轮询带来的循环开销,更重要的问题是每次调用 select 都需要向操作系统传递监视对象信息。为了解决每次调用都需要系统中断的问题,才有了更进一步的解决方案,比如 windows 下的 IOCP, linux 下的 epoll等,这种处理方式必须操作系统支持,支持的方式与力度不同也带来不同的方案,当然其中 IOCP 属于更彻底的异步方式。
Select 在连接数量较少,或者需要程序保持跨平台的兼容性时其实也是具有优势的。select 为 POSIX 标准中的,而 epoll 为 linux 所特有的。不管是 select 还是 epoll 使用I/O复用也并非是提高每路连接的效率,更多的是为了支持大量连接的并发处理。
Epoll 中最重要的三个函数,
- epoll_create: 创建保存 epoll 文件描述符空间;
- epoll_ctl: 向空间注册并操作文件描述符;
- epoll_wait: 等待文件描述符发生变化。
Select 中直接声明了 fd_set,用来保存监视对象文件描述符。epoll 则通过epoll_create 向操作系统申请创建保存文件描述符的空间。为了添加或删除监视对象文件描述符,select 通过 FD_SET,FD_CLR 函数实现,epoll 则通过 epoll_ctl 实现,通过操作符请求操作系统完成操作。select 通过遍历 fd_set 变量查看监视对象是否有事件发生,epoll 则是通过 epoll_event 汇聚发生变化的文件描述符。
epoll event op:
- EPOLL_CTL_ADD
- Register the target file descriptor fd on the epoll instance referred to by the file descriptor epfd and associate the event event with the internal file linked to fd.
- EPOLL_CTL_MOD
- Change the event event associated with the target file descriptor fd.
- EPOLL_CTL_DEL
- Remove (deregister) the target file descriptor fd from the epoll instance referred to by epfd. The event is ignored and can be NULL (but see BUGS below).
使用示例:
#define MAX_EVENTS 10
struct epoll_event ev, events[MAX_EVENTS];
int listen_sock, conn_sock, nfds, epollfd;
/* Code to set up listening socket, 'listen_sock',
(socket(), bind(), listen()) omitted */
epollfd = epoll_create1(0);
if (epollfd == -1) {
perror("epoll_create1");
exit(EXIT_FAILURE);
}
ev.events = EPOLLIN;
ev.data.fd = listen_sock;
if (epoll_ctl(epollfd, EPOLL_CTL_ADD, listen_sock, &ev) == -1) {
perror("epoll_ctl: listen_sock");
exit(EXIT_FAILURE);
}
for (;;) {
nfds = epoll_wait(epollfd, events, MAX_EVENTS, -1);
if (nfds == -1) {
perror("epoll_wait");
exit(EXIT_FAILURE);
}
for (n = 0; n < nfds; ++n) {
if (events[n].data.fd == listen_sock) {
conn_sock = accept(listen_sock, (struct sockaddr *) &addr, &addrlen);
if (conn_sock == -1) {
perror("accept");
exit(EXIT_FAILURE);
}
setnonblocking(conn_sock);
ev.events = EPOLLIN | EPOLLET;
ev.data.fd = conn_sock;
if (epoll_ctl(epollfd, EPOLL_CTL_ADD, conn_sock, &ev) == -1) {
perror("epoll_ctl: conn_sock");
exit(EXIT_FAILURE);
}
} else {
do_use_fd(events[n].data.fd);
}
}
}
- UDT::epoll_create -> CUDT::epoll_create -> CUDTUnited::epoll_create -> CEPoll::create
UDT epoll 的创建实际上就是系统的 epoll_create 函数,每次创建会生成一个 epoll 的描述结构,存入一个map中。其中有epoll本身的描述符,以及 epoll 的id,也是在map中的key。
int CEPoll::create()
{
CGuard pg(m_EPollLock);
int localid = 0;
#ifdef LINUX
localid = epoll_create(1024); //创建 epoll 例程
if (localid < 0)
throw CUDTException(-1, 0, errno);
#else
// on BSD, use kqueue
// on Solaris, use /dev/poll
// on Windows, select
#endif
if (++ m_iIDSeed >= 0x7FFFFFFF)//每次累加,实际就是后面的 m_iID
m_iIDSeed = 0;
CEPollDesc desc;
desc.m_iID = m_iIDSeed;
desc.m_iLocalID = localid; //epoll 的文件描述符
m_mPolls[desc.m_iID] = desc; //保存epoll信息存入map,非指针
return desc.m_iID;
}
- CUDT::epoll_add_usock -> CUDTUnited::epoll_add_usock -> CEPoll::add_usock
- CUDT::epoll_add_ssock -> CUDTUnited::epoll_add_ssock -> CEPoll::add_ssock
- CUDT::epoll_remove_ssock -> CUDTUnited::epoll_remove_ssock -> CEPoll::remove_ssock
- CUDT::epoll_remove_ssock -> CUDTUnited::epoll_remove_ssock -> CEPoll::remove_ssock
UDT epoll 中有系统 socket 与 UDT socket 两种,分别进行处理。ssock 与传统的 epoll 一致,所以这里主要针对 usock。remove 是 add 的逆操作,也不再描述。
int CUDTUnited::epoll_add_usock(const int eid, const UDTSOCKET u, const int* events)
{
CUDTSocket* s = locate(u);
int ret = -1;
if (NULL != s)
{
ret = m_EPoll.add_usock(eid, u, events);
s->m_pUDT->addEPoll(eid);
}
else
{
throw CUDTException(5, 4);
}
return ret;
}
add_usock 根据事件的类型决定加入哪一个集合。
int CEPoll::add_usock(const int eid, const UDTSOCKET& u, const int* events)
{
CGuard pg(m_EPollLock);
map<int, CEPollDesc>::iterator p = m_mPolls.find(eid);
if (p == m_mPolls.end())
throw CUDTException(5, 13);
if (!events || (*events & UDT_EPOLL_IN))
p->second.m_sUDTSocksIn.insert(u);
if (!events || (*events & UDT_EPOLL_OUT))
p->second.m_sUDTSocksOut.insert(u);
return 0;
}
addEPoll 首先将epool 的 id 插入 m_sPollID map, 根据 buffer 中的数据更新事件。
void CUDT::addEPoll(const int eid)
{
CGuard::enterCS(s_UDTUnited.m_EPoll.m_EPollLock);
m_sPollID.insert(eid);
CGuard::leaveCS(s_UDTUnited.m_EPoll.m_EPollLock);
if (!m_bConnected || m_bBroken || m_bClosing)
return;
if (((UDT_STREAM == m_iSockType) && (m_pRcvBuffer->getRcvDataSize() > 0)) ||
((UDT_DGRAM == m_iSockType) && (m_pRcvBuffer->getRcvMsgNum() > 0)))
{
s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_IN, true);
}
if (m_iSndBufSize > m_pSndBuffer->getCurrBufSize())
{
s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_OUT, true);
}
}
UDT 的 update_events 更新事件实际上就是根据事件的类型,在 CEPollDesc 内部对应的 socket set 中对应插入。比如 UDT_EPOLL_IN 对应 m_sUDTReads,UDT_EPOLL_OUT 对应 m_sUDTWrites, UDT_EPOLL_ERR 对应 m_sUDTExcepts。 enable 如果为真,则插入,否则删除。
int CEPoll::update_events(const UDTSOCKET& uid, std::set<int>& eids, int events, bool enable)
{
CGuard pg(m_EPollLock);
map<int, CEPollDesc>::iterator p;
vector<int> lost;
for (set<int>::iterator i = eids.begin(); i != eids.end(); ++ i)
{
p = m_mPolls.find(*i);
if (p == m_mPolls.end())
{
lost.push_back(*i);
}
else
{
if ((events & UDT_EPOLL_IN) != 0)
update_epoll_sets(uid, p->second.m_sUDTSocksIn, p->second.m_sUDTReads, enable);
if ((events & UDT_EPOLL_OUT) != 0)
update_epoll_sets(uid, p->second.m_sUDTSocksOut, p->second.m_sUDTWrites, enable);
if ((events & UDT_EPOLL_ERR) != 0)
update_epoll_sets(uid, p->second.m_sUDTSocksEx, p->second.m_sUDTExcepts, enable);
}
}
for (vector<int>::iterator i = lost.begin(); i != lost.end(); ++ i)
eids.erase(*i);
return 0;
}
int CUDTUnited::newConnection(const UDTSOCKET listen, const sockaddr* peer, CHandShake* hs)
{
CUDTSocket* ns = NULL;
CUDTSocket* ls = locate(listen);
...
CGuard::enterCS(ls->m_AcceptLock);
ls->m_pQueuedSockets->insert(ns->m_SocketID);
CGuard::leaveCS(ls->m_AcceptLock);
// acknowledge users waiting for new connections on the listening socket
m_EPoll.update_events(listen, ls->m_pUDT->m_sPollID, UDT_EPOLL_IN, true);
CTimer::triggerEvent();
return 1;
}
Lisenter 在收到连接包以后,且验证通过,会建立新连接。建立连接后会通过 update_events 更新 m_sUDTWrites。
int CUDT::listen(sockaddr* addr, CPacket& packet)
{
int32_t id = hs.m_iID;
// When a peer side connects in...
if ((1 == packet.getFlag()) && (0 == packet.getType()))
{
if ((hs.m_iVersion != m_iVersion) || (hs.m_iType != m_iSockType))
{
...
}
else
{
int result = s_UDTUnited.newConnection(m_SocketID, addr, &hs);
if (result == -1)
hs.m_iReqType = 1002;
// send back a response if connection failed or connection already existed
// new connection response should be sent in connect()
if (result != 1)
{
...
}
else
{
// a new connection has been created, enable epoll for write
s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_OUT, true);
}
}
}
...
}
如果处于汇合模式或者连接已经成功,会转入 POST_CONNECT。
int CUDT::connect(const CPacket& response) throw ()
{
// this is the 2nd half of a connection request. If the connection is setup successfully this returns 0.
// returning -1 means there is an error.
// returning 1 or 2 means the connection is in process and needs more handshake
if (m_bRendezvous && ((0 == response.getFlag()) || (1 == response.getType())) && (0 != m_ConnRes.m_iType))
{
//a data packet or a keep-alive packet comes, which means the peer side is already connected
// in this situation, the previously recorded response will be used
goto POST_CONNECT;
}
...
POST_CONNECT:
...
// And, I am connected too.
m_bConnecting = false;
m_bConnected = true;
// register this socket for receiving data packets
m_pRNode->m_bOnList = true;
m_pRcvQueue->setNewEntry(this);
// acknowledge the management module.
s_UDTUnited.connect_complete(m_SocketID);
// acknowledde any waiting epolls to write
s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_OUT, true);
return 0;
}
如果连接请求被接收,将读取新的到来的数据,需要将 socket 加入到 m_sUDTReads 。
UDTSOCKET CUDTUnited::accept(const UDTSOCKET listen, sockaddr* addr, int* addrlen)
{
bool accepted = false;
// !!only one conection can be set up each time!!
...
while (!accepted)
{
if (!accepted && (LISTENING == ls->m_Status))
pthread_cond_wait(&(ls->m_AcceptCond), &(ls->m_AcceptLock));
if (ls->m_pQueuedSockets->empty())
m_EPoll.update_events(listen, ls->m_pUDT->m_sPollID, UDT_EPOLL_IN, false);
}
...
}
// write is not available any more
s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_OUT, false);
// read is not available any more
s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_IN, false);
从代码来看,epoll 的使用主要存在代码例子中,用于C/S模式。对于 UDT 来说,参与到了所有的网络连接建立于收发过程,UDT 增加了 socket 的管理,以便更好进行读写等。