在之前写过一篇五种I/O模型,感兴趣的可以去看一下,今天主要讲其中的一种,那就是I/O多路复用。因为I/O多路复用可以使一个进程同时处理多个连接。这对提高程序的性能至关重要。对于IO复用的概念与理解在上文说的挺清楚了。本文主要说实现IO复用的系统调用。
在linux下,实现IO复用的系统调用主要有三个:select、poll 和 epoll,下面我们将对其进行逐一讲解。
##select
Select是通过将需要监听的文件描述符加入相应的文件描述符集合(readset、writeset,exceptset),由内核负责监视相应的文件描述符是否就绪。
###select API
select函数原形如下:
#include <sys/select.h>
#include <sys/time.h>
int select(int nfds,fd_set *readset,fd_set *writeset,fd_set *exceptset,const struct timeval *timeout)
####参数
Select函数的第一个参数是nfds,是监听的文件描述符的集合中的最大文件描述符加1,他会告诉内核需要监听的文件描述符的个数。Select内部的实现是一个循环遍历1024个文件描述符,而这个nfds就是循环的上限。因为文件描述符石从0开始的,循环从0开始,[0 , nfds),所以是最大文件描述符加1。
第二个参数readset是读事件集合,第三个参数writeset是写事件集合,第四个exceptset是异常事件集合,这三个参数都是传入传出参数。用户可以将需要监听的文件描述符加入对应的集合中,若如果对某一个集合不感兴趣,就可以把它设为NULL。内核通过最这些参数的在线修改反馈其中就绪的事件。所以每次调用select都需要重置这三个参数。
这三个参数都是fd_set 结构体类型指针,fd_set结构体的定义如下:
#include <sys/select.h>
typedef long int __fd_mask;
#define __NFDBITS (8 * (int) sizeof (__fd_mask))
typedef struct
{
/* XPG4.2 requires this member name. Otherwise avoid the name
from the global namespace. */
#ifdef __USE_XOPEN
__fd_mask fds_bits[__FD_SETSIZE / __NFDBITS];
# define __FDS_BITS(set) ((set)->fds_bits)
#else
__fd_mask __fds_bits[__FD_SETSIZE / __NFDBITS];
# define __FDS_BITS(set) ((set)->__fds_bits)
#endif
} fd_set;
fd_set结构体的定义实际包含的是fds_bits位数组,该数组的每个元素的每一位标记一个文件描述符其大小固定,由FD_SETSIZE指定(/usr/include/bits/typesizes.h中),在当前内核中数值为1024,可见每次select系统调用可监听处理的文件描述符最大数量为1024。
由于位操作过于繁琐,所以我们采用下列一些的宏来操作该结构体中的位:
FD_ZERO(fd_set *fdset);将指定的文件描述符集清空。在对文件描述符集合进行设置前,必须对其进行初始化,如果不清空,由于在系统分配内存空间后,通常并不作清空处理,所以结果是不可知的。
**FD_SET(fd_set *fdset);**用于在文件描述符集合中增加一个新的文件描述符。
**FD_CLR(fd_set *fdset);**用于在文件描述符集合中删除一个文件描述符。
*FD_ISSET(int fd,fd_set fdset);用于测试指定的文件描述符是否在该集合中。
最后一个参数是超时时间,是一个struct timeval结构体指针,该结构体定义如下:
有两个成员,一个是秒,一个是毫秒,
struct timeval{
long tv_sec; //second
long tv_usec; //microseconds
}
超时时间可以设置到微秒级别,有三种设置情况:
NULL:阻塞等待,直到某个文件描述符上发生了事件。
0:仅检测描述符集合的状态,然后立即返回。
> 0: 指定超时时间,如果在该时间段里没有事件发生,select将超时返回。
####返回值
成功:返回就绪的文件描述符(可读、可写、异常)的总数。
超时:返回0
失败:返回-1,并设置errno
若在等待的过程中被信号打断,也返回-1,errno设置为EINTR。
####文件描述符就绪的条件
在网络编程中,下列情况下socket可读:
1、socket内核接收缓存区中的字节数大于或等于其低水位标记SO_RCVLOWAT。此时可以无阻塞地读该socket,并且读操作返回的字节数大于0。
2、socket通信对方关闭连接。此时对该socket读操作将返回0。
3、监听socket上有新的连接请求。
4、socket上有未处理的错误。此时我们可以使用getsockopt来读取和清除该错误。
下列情况下socket可写:
1、socket内核发送缓冲区中的可用字节数大于或等于其低水位标记SO_SNDLOWAT。此时我们可以无阻塞写该socket,并且写操作返回的字节数大于0。
2、socket写操作被关闭。对写操作被关闭的socket执行写操作将触发一个SIGPIPE信号。
3、socket使用非阻塞connect连接成功或者失败(超时)之后。
4、socket上有未处理的错误。此时我们可以使用getsockopt来读取和清除该错误。
网络程序中,select能处理的异常情况只有一种:socket上接收到带外数据。
不足:Select监听的最大文件描述符受限于FD_SETSIZE,UNIX系统通常会在头文件 “sys/select.h” 中定义常量FD_SETSIZE,一般为1024,要想更改需要重新编译内核。而且因为select采取的是轮询机制,当监听的文件描述符过多的话,效率会大大折扣。
在下文中将介绍一种新的实现IO复用的函数,poll,与select相比,poll突破了最大文件描述符是1024的限制。
##程序示例
以select实现的简单回射服务器作为本文的结束。
#include<stdio.h>
#include<stdlib.h>
#include<errno.h>
#include<string.h>
#include<sys/types.h>
#include<netinet/in.h>
#include<sys/socket.h>
#include<sys/wait.h>
#include<arpa/inet.h>
#include<unistd.h>
#include<ctype.h>
#define MYPORT 8888
#define BACKLOG 10
#define MAXDATASIZE 1024
int main()
{
char buf[MAXDATASIZE];
int numbytes;
int sock_fd,new_fd,connfd; //定义主动套接字和被动套接字
//定义IPV4套接口地址结构
struct sockaddr_in my_addr; //service 地址
struct sockaddr_in their_addr; //client 地址
int sin_size;
//初始化IPV4套接口地址结构
my_addr.sin_family =AF_INET; //指定该地址家族
my_addr.sin_port =htons(MYPORT); //端口
my_addr.sin_addr.s_addr = INADDR_ANY; //IPV4的地址
bzero(&(my_addr.sin_zero),8);
//socket()函数
if((sock_fd = socket(AF_INET,SOCK_STREAM,0))==-1)
{
perror("socket");
exit(1);
}
//地址重复利用
int on = 1;
if(setsockopt(sock_fd,SOL_SOCKET,SO_REUSEADDR,&on,sizeof(on)) < 0)
{
perror("setsockopt");
exit(1);
}
//bind()函数
if(bind(sock_fd,(struct sockaddr *)&my_addr,sizeof(struct sockaddr))==-1)
{
perror("bind");
exit(1);
}
//listen()函数
if(listen(sock_fd,BACKLOG)==-1)
{
perror("listen");
exit(1);
}
int i;
int client[FD_SETSIZE];//存储客户端的套接字
//初始化
for(i = 0;i < FD_SETSIZE;i++)
{
client[i] = -1;
}
int nready = 0; //接受select的返回值
int maxi = -1; //存储数组下标
int maxfd = sock_fd;//初始化nfds
fd_set rset; //定义可读事件集合
fd_set allset; //备份
FD_ZERO(&allset);
FD_SET(sock_fd,&allset);
while(1)
{
rset = allset;
nready = select(maxfd +1,&rset,NULL,NULL,NULL);
if(nready < 0)
{
perror("select error!\n");
exit(1);
}
else if(0 == nready)
{
continue;
}
//当客户端请求连接
if(FD_ISSET(sock_fd,&rset))
{
sin_size = sizeof(struct sockaddr_in);
if((new_fd=accept(sock_fd,(struct sockaddr *)&their_addr,&sin_size))==-1)
{
perror("accept");
exit(1);
}
printf("client IP: %s\t PORT : %d\n",inet_ntoa(their_addr.sin_addr),ntohs(their_addr.sin_port));
//将客户端的套接字存入client[]
for(i = 0;i < FD_SETSIZE;i++)
{
if(client[i] < 0)
{
client[i] = new_fd;
break; //找到合适位置就没必要继续遍历
}
}
//判断是否达到连接上限
if(i == FD_SETSIZE)
{
printf("too many client!\n");
break;
}
//将新加入的客户端放入监听队伍
FD_SET(new_fd,&allset);
//更新nfds
if(new_fd > maxfd)
{
maxfd = new_fd;
}
//更新maxi
if(i > maxi)
{
maxi = i;
}
//判断是否已经处理完事件
if((--nready) == 0)
{
continue;
}
}
//当客户端发送数据
for(i = 0;i <= maxi;i++)
{
if((connfd = client[i]) < 0)
{
continue;
}
if(FD_ISSET(connfd,&rset))
{
memset(buf,0,sizeof(buf));
numbytes = recv(connfd,buf,MAXDATASIZE,0);
if(numbytes == -1)
{
perror("recv\n");
exit(1);
}
else if(numbytes == 0)
{
printf("client close!");
FD_CLR(connfd,&allset);
client[i] = -1;
}
send(connfd,buf,numbytes,0);
}
if((--nready) == 0)
{
break;
}
}
}
close(sock_fd);
close(new_fd);
return 0;
}