与poll
模式的事件宏相比,epoll
模式新增了一个事件宏EPOLLET
,即边缘触发模式。默认模式为水平触发模式。
这两种模式的区别如下:
以socket
的读事件为例,对于水平模式,只要在socket
上有未读完的数据,就会一直产生EPOLLIN
事件;而对于边缘模式,socket
上每新来以此数据就会触发一次,如果上一次触发后未将socket
上的数据读完,也不会再触发,除非再新来一次数据。对于socket
写事件,如果socket
的TCP窗口一直不饱和,就会一直触发EPOLLOUT
事件;而对于边缘模式,只会触发一次,除非TCP窗口由不饱和变成饱和再一次变成不饱和,才会再次触发EPOLLOUT
事件。
socket
可读事件的水平模式触发条件:
socket
上无数据->socket
上有数据socket
处于有数据状态socket
可读事件的边缘触发模式条件:
socket
上无数据->socket
上有数据socket
又来一次数据socket
可写事件的水平模式触发条件:
socket
可写->socket
不可写socket
不可写->socket
可写socket
可写的边缘触发条件:
socket
不可写->socket
可写也就是说,对于一个阻塞socket
,如果使用epoll
边缘模式检测数据是否可读,则触发可读事件后,一定要一次性的把socket
上的数据收取干净。也就是说,一定要循环调用recv
函数直到recv
出错,错误码是EWOULDBLOCK
(EAGAIN也一样,此时表示socket
上的本次数据已经读完);如果使用水平模式,则我们可以根据业务一次性地收取固定的字节数,或者到收完为止。
bool TcpSession::RecvEtMode()
{
//每次收取256字节
char buff[256];
while(true)
{
int nRecv = ::recv(m_clientfd, buff, 256, 0);
if(nRecv == -1)
{
if(errno == EWOULDBLOCK)
return true;
else if(errno == EINTR)
continue;
return false;
}
//对端关闭了socket
else if(nRecv == 0)
return false;
m_inputBuffer.add(buff, (size_t)nRecv);
}
return true;
}
//epoll_server.cpp
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/epoll.h>
#include <poll.h>
#include <string.h>
#include <vector>
#include <errno.h>
#include <iostream>
int main()
{
//创建一个监听socket
int listenfd = socket(AF_INET, SOCK_STREAM, 0);
if(listenfd == -1)
{
std::cout<<"create listen socket error" << std::endl;
return -1;
}
//设置重用IP地址和端口号
int on = 1;
setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, (char*)&on, sizeof(on));
setsockopt(listenfd, SOL_SOCKET, SO_REUSEPORT, (char*)&on, sizeof(on));
//将监听socket设置为非阻塞
int oldSocketFlag = fcntl(listenfd, F_GETFL, 0);
int newSocketFlag = oldSocketFlag | O_NONBLOCK;
if(fcntl(listenfd, F_SETFL, newSocketFlag) == -1)
{
close(listenfd);
std::cout << "set listenfd to nonblock error" << std::endl;
return -1;
}
//初始化服务器的地址
struct sockaddr_in bindaddr;
bindaddr.sin_family = AF_INET;
bindaddr.sin_addr.s_addr = htonl(INADDR_ANY);
bindaddr.sin_port = htons(3000);
if(bind(listenfd, (struct sockaddr*)&bindaddr, sizeof(bindaddr)) == -1)
{
std::cout<< "bind listen socket error" << std::endl;
close(listenfd);
return -1;
}
//启动监听
if(listen(listenfd, SOMAXCONN) == -1)
{
std::cout << "listen error." << std::endl;
close(listenfd);
return -1;
}
//创建epollfd
int epollfd = epoll_create(1);
if(epollfd == -1)
{
std::cout<<"create epollfd error." << std::endl;
close(listenfd);
return -1;
}
epoll_event listen_fd_event;
listen_fd_event.data.fd = listenfd;
listen_fd_event.events = EPOLLIN;
//若取消注释这一行,则使用ET模式
//listen_fd_event.events |= EPOLLIN;
//将监听socket绑定到epollfd上
if(epoll_ctl(epollfd, EPOLL_CTL_ADD, listenfd, &listen_fd_event) == -1)
{
std::cout << "epoll_ctl error" << std::endl;
close(listenfd);
return -1;
}
int n;
while(true)
{
epoll_event epoll_events[1024];
n = epoll_wait(epollfd, epoll_events, 1024, 1000);
if(n < 0)
{
//被信号中断
if(errno == EINTR)
continue;
//出错,退出
break;
}
else if(n ==0)
{
//超时
continue;
}
for(size_t i = 0; i < n; ++i)
{
//事件可读
if(epoll_events[i].events & EPOLLIN)
{
if(epoll_events[i].data.fd == listenfd)
{
//监听socket,接受新连接
struct sockaddr_in clientaddr;
socklen_t clientaddrlen = sizeof(clientaddr);
int clientfd = accept(listenfd, (struct sockaddr*)&clientaddr, &clientaddrlen);
if(clientfd != -1)
{
int oldSocketFlag = fcntl(clientfd, F_GETFL, 0);
int newSocketFlag = oldSocketFlag | O_NONBLOCK;
if(fcntl(clientfd, F_SETFL, newSocketFlag) == -1)
{
close(clientfd);
std::cout<<"set clientfd to nonblocking error" << std::endl;
}
else
{
epoll_event client_fd_event;
client_fd_event.data.fd = clientfd;
client_fd_event.events = EPOLLIN;
//若取消注释这一行,则使用ET模式
//client_fd_event.events |= EPOLLET;
if(epoll_ctl(epollfd, EPOLL_CTL_ADD, clientfd, &client_fd_event)!= -1)
{
std::cout<<"new client accept, clientfd: " << clientfd << std::endl;
}
else
{
std::cout<<"add client fd to epollfd error" << std::endl;
close(clientfd);
}
}
}
}
else
{
std::cout << "client fd: " << epoll_events[i].data.fd << "recv data." << std::endl;
//普通clientfd
char ch;
//每次只接受1字节
int m = recv(epoll_events[i].data.fd, &ch, 1, 0);
if(m == 0)
{
//对端关闭了连接,从epollfd上移除clientfd
if(epoll_ctl(epollfd, EPOLL_CTL_DEL, epoll_events[i].data.fd, NULL) != -1)
{
std::cout<<"client disconnected,clientfd" << epoll_events[i].data.fd << std::endl;
}
close(epoll_events[i].data.fd);
}
else if(m < 0)
{
//出错
if(errno != EWOULDBLOCK && errno != EINTR)
{
if(epoll_ctl(epollfd, EPOLL_CTL_DEL, epoll_events[i].data.fd, NULL) != -1)
{
std::cout<<"client disconnected,clientfd" << epoll_events[i].data.fd << std::endl;
}
close(epoll_events[i].data.fd);
}
}
else
{
//正常收到数据
std::cout<<"recv from client:"<<epoll_events[i].data.fd<<", " << ch << std::endl;
}
}
}
else if(epoll_events[i].events & EPOLLERR)
{
//不处理
}
}
}
close(listenfd);
return 0;
}
服务器的代码:
kkk@kkk-VirtualBox:~/code$ g++ -g -o epoll_server epoll_server.cpp
kkk@kkk-VirtualBox:~/code$ ./epoll_server
客户端的代码:
kkk@kkk-VirtualBox:~$ nc -v 127.0.0.1 3000
Connection to 127.0.0.1 3000 port [tcp/*] succeeded!
abcdef
此时服务器输出
kkk@kkk-VirtualBox:~/code$ g++ -g -o epoll_server epoll_server.cpp
kkk@kkk-VirtualBox:~/code$ ./epoll_server
new client accept, clientfd: 5
client fd: 5recv data.
recv from client:5, a
client fd: 5recv data.
recv from client:5, b
client fd: 5recv data.
recv from client:5, c
client fd: 5recv data.
recv from client:5, d
client fd: 5recv data.
recv from client:5, e
client fd: 5recv data.
recv from client:5, f
client fd: 5recv data.
recv from client:5,
nc
命令实际发送了a,b,c,d,e,f和\n
这7个字符,由于服务端使用的是LT模式,每次接收一个字符,只要socket
接收缓冲区中仍有数据可读,POLLIN
事件就会一直触发,所以服务器一共有7次输出,直到socket
接收缓冲区没有数据时为止。
把上述代码的注释去掉。使用ET模式。
kkk@kkk-VirtualBox:~/code$ gedit epoll_server.cpp
kkk@kkk-VirtualBox:~/code$ g++ -g -o epoll_server epoll_server.cpp
kkk@kkk-VirtualBox:~/code$ ./epoll_server
new client accept, clientfd: 5
client fd: 5recv data.
recv from client:5, a
由于使用了使用了ET模式,所以只会触发一次POLLIN
事件,如果此时没有新数据到来,就再也不会触发。所以,如果我们继续向服务器发送一条新数据加123,则服务器将再次触发一次EPOLLIN
事件,然后打印出字母b。
服务器输出端如下:
kkk@kkk-VirtualBox:~/code$ ./epoll_server
new client accept, clientfd: 5
client fd: 5recv data.
recv from client:5, a
client fd: 5recv data.
recv from client:5, b
所以,使用ET模式处理读事件,切记要将该次socket上的数据收完。
//epoll_server_write_event_lt.cpp
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/epoll.h>
#include <poll.h>
#include <iostream>
#include <string.h>
#include <vector>
#include <errno.h>
#include <iostream>
int main()
{
//创建一个监听socket
int listenfd = socket(AF_INET, SOCK_STREAM, 0);
if(listenfd == -1)
{
std::cout << "create listen socket error" << std::endl;
return -1;
}
//设置重用IP地址和端口号
int oldSocketFlag = fcntl(listenfd, F_GETFL, 0);
int newSocketFlag = oldSocketFlag | O_NONBLOCK;
if(fcntl(listenfd, F_SETFL, newSocketFlag) == -1)
{
close(listenfd);
std::cout << "set listenfd to nonblock error" << std::endl;
return -1;
}
//初始化服务器地址
struct sockaddr_in bindaddr;
bindaddr.sin_family = AF_INET;
bindaddr.sin_addr.s_addr = htonl(INADDR_ANY);
bindaddr.sin_port = htons(3000);
if(bind(listenfd, (struct sockaddr*)&bindaddr, sizeof(bindaddr)) == -1)
{
std::cout << "bind listen socket error" << std::endl;
close(listenfd);
return -1;
}
//启动监听
if(listen(listenfd, SOMAXCONN) == -1)
{
std::cout << "listen error." << std::endl;
close(listenfd);
return -1;
}
//创建epollfd
int epollfd = epoll_create(1);
if(epollfd == -1)
{
std::cout << "create epollfd error." << std::endl;
close(listenfd);
return -1;
}
epoll_event listen_fd_event;
listen_fd_event.data.fd = listenfd;
listen_fd_event.events = EPOLLIN;
//若取消注释这一行,则使用ET模式
//listen_fd_event.events |= EPOLLET;
//将监听socket绑定到epollfd上
if(epoll_ctl(epollfd, EPOLL_CTL_ADD, listenfd, &listen_fd_event) == -1)
{
std::cout << "epoll_ctl error" << std::endl;
close(listenfd);
return -1;
}
int n;
while(true)
{
epoll_event epoll_events[1024];
n = epoll_wait(epollfd, epoll_events, 1024, 1000);
if(n < 0)
{
//被信号中断
if(errno == EINTR)
continue;
//出错,退出
break;
}
else if(n == 0)
{
//超时,继续
continue;
}
for(size_t i = 0; i < n; i++)
{
//事件可读
if(epoll_events[i].events & EPOLLIN)
{
if(epoll_events[i].data.fd == listenfd)
{
//监听socket,接受新连接
struct sockaddr_in clientaddr;
socklen_t clientaddrlen = sizeof(clientaddr);
int clientfd = accept(listenfd, (struct sockaddr*)&clientaddr, &clientaddrlen);
if(clientfd != -1)
{
int oldSocketFlag = fcntl(clientfd, F_GETFL, 0);
int newSocketFlag = oldSocketFlag | O_NONBLOCK;
if(fcntl(clientfd, F_SETFL, newSocketFlag) == -1)
{
close(clientfd);
std::cout << "set clientfd to nonblocking error." << std::endl;
}
else
{
epoll_event client_fd_event;
client_fd_event.data.fd = clientfd;
//同时监听新来的连接socket的读和写事件
client_fd_event.events = EPOLLIN | EPOLLOUT;
//取消注释这一行时,使用ET模式
//client_fd_event.events |= EPOLLET;
if(epoll_ctl(epollfd, EPOLL_CTL_ADD, clientfd, &client_fd_event) != -1)
{
std::cout << "new client accepted, clientfd: " << clientfd << std::endl;
}
else
{
std::cout << "add client fd to epollfd error" << std::endl;
close(clientfd);
}
}
}
}
else
{
std::cout << "client fd: " << epoll_events[i].data.fd << " recv data." << std::endl;
//普通clientfd
char recvbuf[1024] = {0};
//每次只接受1字节
int m = recv(epoll_events[i].data.fd, recvbuf, 1024, 0);
if(m == 0)
{
//对端关闭了连接,从epollfd上移除clientfd
if(epoll_ctl(epollfd, EPOLL_CTL_DEL, epoll_events[i].data.fd, NULL) != -1)
{
std::cout << "client disconnected, clientfd" << epoll_events[i].data.fd << std::endl;
}
close(epoll_events[i].data.fd);
}
else if(m < 0)
{
//出错
if(errno != EWOULDBLOCK && errno != EINTR)
{
if(epoll_ctl(epollfd, EPOLL_CTL_DEL, epoll_events[i].data.fd, NULL) != -1)
{
std::cout << "client disconnected, clientfd" << epoll_events[i].data.fd << std::endl;
}
close(epoll_events[i].data.fd);
}
}
else
{
//正常收到数据
std::cout << "recv from client:" << epoll_events[i].data.fd << ", " << recvbuf << std::endl;
}
}
}
else if(epoll_events[i].events & EPOLLOUT)
{
//只处理客户端fd的可写事件
if(epoll_events[i].data.fd != listenfd)
{
std::cout << "EPOLLOUT triggered, clientfd: " << epoll_events[i].data.fd << std::endl;
}
}
else if (epoll_events[i].events & EPOLLERR)
{}
}
}
close(listenfd);
return 0;
}
以上代码中,我们对新来的连接fd
同时注册读和写事件,再次编译程序并运行。
kkk@kkk-VirtualBox:~/code$ g++ -g -o epoll_server_write_event_lt epoll_server_write_event_lt.cpp
kkk@kkk-VirtualBox:~/code$ ./epoll_server_write_event_lt
然后用nc
命令模拟一个客户端去连接epoll_server_write_event_lt
kkk@kkk-VirtualBox:~/桌面$ nc -v 127.0.0.1 3000
Connection to 127.0.0.1 3000 port [tcp/*] succeeded!
此时服务端会疯狂输出可写事件的触发消息
EPOLLOUT triggered, clientfd: 5
EPOLLOUT triggered, clientfd: 5
EPOLLOUT triggered, clientfd: 5
EPOLLOUT triggered, clientfd: 5
EPOLLOUT triggered, clientfd: 5
EPOLLOUT triggered, clientfd: 5
EPOLLOUT triggered, clientfd: 5
EPOLLOUT triggered, clientfd: 5
EPOLLOUT triggered, clientfd: 5
因为我们注册了可写事件且使用的是LT模式。在LT模式下,由于这里的服务端对应的客户端fd
一直是可写的,有写事件一直触发,所以会看到屏幕不断输出。
//epoll_server_write_event_et.cpp
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/epoll.h>
#include <poll.h>
#include <iostream>
#include <string.h>
#include <vector>
#include <errno.h>
int main()
{
//创建一个监听socket
int listenfd = socket(AF_INET,SOCK_STREAM, 0);
if(listenfd == -1)
{
std::cout<<"create listen socket error" << std::endl;
return -1;
}
//设置重用IP地址和端口号
int on = 1;
setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, (char*)&on, sizeof(on));
setsockopt(listenfd, SOL_SOCKET, SO_REUSEPORT, (char*)&on, sizeof(on));
//将监听socket设置为非阻塞的
int oldSocketFlag = fcntl(listenfd, F_GETFL, 0);
int newSocketFlag = oldSocketFlag | O_NONBLOCK;
if(fcntl(listenfd, F_SETFL, newSocketFlag) == -1)
{
close(listenfd);
std::cout << "set listenfd to nonblock error" << std::endl;
return -1;
}
//初始化服务器的地址
struct sockaddr_in bindaddr;
bindaddr.sin_family = AF_INET;
bindaddr.sin_addr.s_addr = htonl(INADDR_ANY);
bindaddr.sin_port = htons(3000);
if(bind(listenfd, (struct sockaddr*)&bindaddr, sizeof(bindaddr)) == -1)
{
std::cout << "bind listen socket error" << std::endl;
close(listenfd);
return -1;
}
//启动监听
if(listen(listenfd, SOMAXCONN) == -1)
{
std::cout << "listen error." << std::endl;
close(listenfd);
return -1;
}
//创建epollfd
int epollfd = epoll_create(1);
if(epollfd == -1)
{
std::cout << "create epollfd error." << std::endl;
close(listenfd);
return -1;
}
epoll_event listen_fd_event;
listen_fd_event.data.fd = listenfd;
listen_fd_event.events = EPOLLIN;
//若取消注释这一行,则使用ET模式
//listen_fd_event.events |= EPOLLET;
//将监听socket绑定到epollfd上
if(epoll_ctl(epollfd, EPOLL_CTL_ADD, listenfd, &listen_fd_event) == -1)
{
std::cout << "epoll_ctl error" << std::endl;
close(listenfd);
return -1;
}
int n;
while(true)
{
epoll_event epoll_events[1024];
n = epoll_wait(epollfd, epoll_events, 1024, 1000);
if(n < 0)
{
//被信号中断
if(errno == EINTR)
continue;
//出错退出
break;
}
else if(n == 0)
{
//超时,继续
continue;
}
for(size_t i = 0; i < n; i++)
{
//有读事件
if(epoll_events[i].events & EPOLLIN)
{
if(epoll_events[i].data.fd == listenfd)
{
//监听socket,接受新连接
struct sockaddr_in clientaddr;
socklen_t clientaddrlen = sizeof(clientaddr);
int clientfd = accept(listenfd,(struct sockaddr*)&clientaddr, &clientaddrlen);
if(clientfd != -1)
{
int oldSockFlag = fcntl(clientfd, F_GETFL, 0);
int newSockFlag = oldSocketFlag | O_NONBLOCK;
if(fcntl(clientfd, F_SETFL, newSocketFlag) == -1)
{
close(clientfd);
std::cout << "set clientfd to nonblocking error." << std::endl;
}
else
{
epoll_event client_fd_event;
client_fd_event.data.fd = clientfd;
//同时监听新来连接socket的读和写时间
client_fd_event.events = EPOLLIN | EPOLLOUT;
//使用ET模式
client_fd_event.events |= EPOLLET;
if(epoll_ctl(epollfd, EPOLL_CTL_ADD, clientfd, &client_fd_event) != -1)
{
std::cout << "new client accept, clientfd: "<<clientfd<<std::endl;
}
else
{
std::cout << "add client fd to epollfd error" << std::endl;
close(clientfd);
}
}
}
}
else
{
std::cout << "client fd: " << epoll_events[i].data.fd << " recv data." << std::endl;
char recvbuf[1024] = { 0 };
int m = recv(epoll_events[i].data.fd, recvbuf, 1024, 0);
if(m == 0)
{
//对端关闭了连接,从epollfd上移除了clientfd
if(epoll_ctl(epollfd, EPOLL_CTL_DEL, epoll_events[i].data.fd, NULL) != -1)
{
std::cout << "client disconnected, clientfd: " << epoll_events[i].data.fd << std::endl;
}
close(epoll_events[i].data.fd);
}
else if(m < 0)
{
//出错
if(errno != EWOULDBLOCK && errno != EINTR)
{
if(epoll_ctl(epollfd, EPOLL_CTL_DEL, epoll_events[i].data.fd, NULL) != -1)
{
std::cout << "client disconnected, client:" << epoll_events[i].data.fd << std::endl;
}
close(epoll_events[i].data.fd);
}
}
else
{
//正常收到数据
std::cout << "recv from client:" << epoll_events[i].data.fd << ", " << recvbuf << std::endl;
epoll_event client_fd_event;
client_fd_event.data.fd = epoll_events[i].data.fd;
//再次给clientfd注册检测可写事件
client_fd_event.events = EPOLLIN | EPOLLOUT|EPOLLET;
if(epoll_ctl(epollfd, EPOLL_CTL_MOD, epoll_events[i].data.fd, &client_fd_event) != -1)
{
std::cout << "epoll _ctl successfully ,mode : EPOLL_CTL_MOD, clientfd:" << epoll_events[i].data.fd << std::endl;
}
}
}
}
else if(epoll_events[i].events & EPOLLOUT)
{
//只处理客户端fd写事件
if(epoll_events[i].data.fd != listenfd)
{
std::cout << "EPOLLOUT triggered, clientfd: " << epoll_events[i].data.fd << std::endl;
}
}
else if(epoll_events[i].events & EPOLLERR)
{}
}
}
close(listenfd);
return 0;
}
以上逻辑中,服务端在每次收到客户端消息时都会重新给客户端fd
注册检测可写事件EPOLLOUT
,重新编译代码并启动服务。
nc
客户端的执行结果如下
kkk@kkk-VirtualBox:~/桌面$ nc -v 127.0.0.1 3000
Connection to 127.0.0.1 3000 port [tcp/*] succeeded!
msg1
msg2
msg3
epoll_server_write_event_et
服务端的执行结果如下:
kkk@kkk-VirtualBox:~/code$ ./epoll_server_write_event_et
new client accept, clientfd: 5
EPOLLOUT triggered, clientfd: 5
client fd: 5 recv data.
recv from client:5, msg1
epoll _ctl successfully ,mode : EPOLL_CTL_MOD, clientfd:5
EPOLLOUT triggered, clientfd: 5
client fd: 5 recv data.
recv from client:5, msg2
epoll _ctl successfully ,mode : EPOLL_CTL_MOD, clientfd:5
EPOLLOUT triggered, clientfd: 5
client fd: 5 recv data.
recv from client:5, msg3
epoll _ctl successfully ,mode : EPOLL_CTL_MOD, clientfd:5
EPOLLOUT triggered, clientfd: 5
通过以上输出,可以发现,当使用ET模式下,即使服务端给客户端fd
注册了检测可写事件,可写事件也不会一直触发,只会触发一次,触发完成后只有再次注册,检测可写事件时,可写事件才会继续触发。在epoll_server_write_event_et
服务中是靠客户端来新消息驱动再次注册,检测可写事件的。也就是说,我们使用ET模式去处理可写事件,不必像LT模式那样为了避免不必要的可写事件重复触发,在可写事件触发后,如果不再需要,则应该立即移除对可写事件的注册。
这就意味着,使用LT模式时,如果我们的实现依赖于可写事件触发去发送数据,那么我们一定要在数据发送完成后移除检测可写事件,避免没有数据发送时无意义地触发。使用ET模式时,如果我们的实现也依赖于可写事件触发去发送数据,在可写事件触发后调用send
函数发送数据,则如果数据本次不能全部发送完成(对于非阻塞的socket
,此时send
函数返回-1,错误码为EAGAIN或EWOULDBLOCK),则一定要继续注册,检测可写事件,否则我们剩余的数据就再也没有机会发送了,因为ET模式的可写事件再也不会被触发。
recv
或者read
函数返回-1,错误码为EAGAIN
或EWOULDBLOCK
);在ET模式下,读事件时必须把数据收取干净,因为我们不一定再有机会收取数据了,即使有机会,也可能因为没有及时处理上次没读完的数据,造成客户端响应延迟。socket
)或何时接收连接(对于监听socket
),但是可能会导致多次触发;使用ET模式,我们必须每次都将数据收完,或立即调用accept
接收连接(对于监听socket
),其优点是触发次数少。