套接字I/O Select模型的“中心思想”便是利用select函数,实现对I/O的管理。利用select函数判断套接字(一个或多个)上是否存在数据,或者能否向套接字写入数据。它也是同步的,也会阻塞。但和套接字I/O阻塞模型不同的是,Select模型可以同时管理多个Socket。
select函数原型:
int select (
int nfds,
fd_set FAR * readfds,
fd_set FAR * writefds,
fd_set FAR * exceptfds,
const struct timeval FAR * timeout
);
第一个参数nfds会被忽略。之所以仍然要提供这个参数,只是为了保持与早期的Berkeley套接字应用程序的兼容。
三个fd_set参数:一个用于检查可读性(readfds),一个用于检查可写性(writefds),另一个用于例外数据( excepfds)。
从根本上说,fdset数据类型代表着一系列特定套接字的集合。
其中,readfds集合包括符合下述任何一个条件的套接字:
* 有数据可以读入。
* 连接已经关闭、重设或中止。(可以用来判断客户端或服务端程序是否退出了)
* 假如已调用了listen,而且一个连接正在建立,那么accept函数调用会成功。
writefds集合包括符合下述任何一个条件的套接字:
* 有数据可以发出。
* 如果已完成了对一个非锁定连接调用的处理,连接就会成功。
最后,exceptfds集合包括符合下述任何一个条件的套接字:
* 假如已完成了对一个非锁定连接调用的处理,连接尝试就会失败。
* 有带外(out-of-band,OOB)数据可供读取。
例如,假定我们想测试一个套接字是否“可读”,必须将自己的套接字增添到readfds集合,再等待select函数完成。select完成之后,必须判断自己的套接字是否仍为readfds集合的一部分。若答案是肯定的,便表明该套接字“可读”,可立即着手从它上面读取数据。在三个参数中(readfds、writedfss和exceptfds),任何两个都可以是空值(NULL);但是,至少有一个不能为空值!在任何不为空的集合中,必须包含至少一个套接字句柄;否则,select函数便没有任何东西可以等待。
最后一个参数timeout对应的是一个指针,它指向一个timeval结构,用于决定select最多等待I/O操作完成多久的时间。如timeout是一个空指针,那么select调用会无限期地“锁定”或停顿下去,直到至少有一个描述符符合指定的条件后结束。对timeval结构的定义如下:
struct timeval {
long tv_sec;
long tv_usec;
} ;
若将超时值设置为(0,0),表明select会立即返回,允许应用程序对select操作进行“轮询”。出于对性能方面的考虑,应避免这样的设置。
select成功完成后,会在fd_set结构中,返回刚好有未完成的I/O操作的所有套接字句柄的总量。
若超过timeval设定的时间,便会返回0。
不管由于什么原因,假如select调用失败,都会返回SOCKET_ERROR。
用select对套接字进行监视之前,在自己的应用程序中,必须将套接字句柄分配给一个集合,设置好一个或全部读、写以及例外fd_set结构。将一个套接字分配给任何一个集合后,再来调用select,便可知道一个套接字上是否正在发生上述的I/O活动。
Winsock提供了下列宏操作,对fd_set进行处理与检查:
FD_CLR(s, *set); 从set中删除套接字s。
FD_ISSET(s, *set); 检查s是否set集合的一名成员;返回TRUE或FALSE。
FD_SET(s, *set); 将套接字s加入集合set。
FD_ZERO(*set); 将set初始化成空集合。
服务端代码比之前介绍的“套接字I/O 阻塞模型”中的示例稍微复杂一点。可以管理多个客户端的连接,对每个新的客户端发送“Hello,I’m server”问候信息,在客户端程序退出时自动关闭连接等功能。
#include <winsock2.h>
#include <iostream>
#include <assert.h>
#include <vector>
using namespace std;
#pragma comment(lib, "Ws2_32.lib")
const u_short kPort = 10001;
const std::string kHelloServer = "hello, I'm server.";
int main()
{
WSADATA wsaData;
WORD wVersionRequested = MAKEWORD(2, 2);
WSAStartup(wVersionRequested, &wsaData);
SOCKET socket_ = INVALID_SOCKET;
std::vector<SOCKET> clients;
do
{
// (1)
socket_ = ::socket(AF_INET, SOCK_STREAM, 0);
if (socket_ == INVALID_SOCKET) {
std::cout << "create socket failed, GLE: " << WSAGetLastError() << std::endl;
break;
}
// (2)
struct sockaddr_in addr = { 0 };
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = htonl(INADDR_ANY);
addr.sin_port = htons(kPort);
if (bind(socket_, reinterpret_cast<const sockaddr*>(&addr), sizeof(addr)) == SOCKET_ERROR) {
std::cout << "bind failed, GLE: " << WSAGetLastError() << std::endl;
break;
}
// (3)
if (listen(socket_, 5) == SOCKET_ERROR) {
std::cout << "listen failed, GLE: " << WSAGetLastError() << std::endl;
break;
}
std::cout << "listen on port: " << kPort << std::endl;
fd_set fd_read;
while (true) // TODO 未处理何时退出的问题
{
// (4)
FD_ZERO(&fd_read);
FD_SET(socket_, &fd_read);
for (std::vector<SOCKET>::iterator it = clients.begin(); it != clients.end(); ++it)
FD_SET(*it, &fd_read);
// (5)
timeval timeout = { 3, 0 };
int ret = select(0, &fd_read, NULL, NULL, &timeout);
if (ret == SOCKET_ERROR) {
std::cout << "select failed, GLE: " << WSAGetLastError() << std::endl;
break;
}
// (6.1)
// 检查服务端的监听socket,是否有新的连接被搁置
if (FD_ISSET(socket_, &fd_read)) {
// (7.1)
struct sockaddr_in addr_c = { 0 };
int addr_len = sizeof(addr_c);
SOCKET s = accept(socket_, reinterpret_cast<sockaddr*>(&addr_c), &addr_len);
if (s == SOCKET_ERROR) {
std::cout << "accept failed, GLE: " << WSAGetLastError() << std::endl;
}
else {
clients.push_back(s);
std::cout << "new connection" << std::endl;
int left = kHelloServer.length();
int idx = 0;
while (left > 0) {
int err = send(s, (const char*)(kHelloServer.c_str() + idx), left, 0);
if (err == SOCKET_ERROR) {
std::cout << "send failed, GLE: " << WSAGetLastError() << std::endl;
break;
}
left -= err;
idx += err;
std::cout << "bytes sent: " << err << std::endl;
}
}
}
// (6.2)
// 检查与客户端连接的socket,是否有数据可以读入
for (std::vector<SOCKET>::iterator it = clients.begin(); it != clients.end(); ) {
if (FD_ISSET(*it, &fd_read)) {
// (7.2)
char buf[100] = { 0 };
int err = recv(*it, buf, 100, 0);
if (err > 0) {
std::cout << "recv: " << buf << std::endl;
++it;
}
else if (err == 0) {
std::cout << "connection closed." << std::endl;
closesocket(*it);
it = clients.erase(it);
}
else {
std::cout << "recv failed, GLE: " << WSAGetLastError() << std::endl;
closesocket(*it);
it = clients.erase(it);
}
}
else {
++it;
}
}
} // while
} while (false);
// (8)
for (std::vector<SOCKET>::iterator it = clients.begin(); it != clients.end(); ++it) {
closesocket(*it);
}
// (9)
closesocket(socket_);
WSACleanup();
return 0;
}
客户端在连接上服务端之后,不间断的接收服务端的消息。
#include <winsock2.h>
#include <iostream>
#include <assert.h>
#include <vector>
using namespace std;
#pragma comment(lib, "Ws2_32.lib")
const std::string kIP = "127.0.0.1";
const u_short kPort = 10001;
const std::string kHelloClient = "hello, I'm client.";
int main()
{
WSADATA wsaData;
WORD wVersionRequested = MAKEWORD(2, 2);
WSAStartup(wVersionRequested, &wsaData);
SOCKET socket_ = INVALID_SOCKET;
do
{
// (1)
socket_ = ::socket(AF_INET, SOCK_STREAM, 0);
if (socket_ == INVALID_SOCKET) {
std::cout << "create socket failed, GLE: " << WSAGetLastError() << std::endl;
break;
}
// (2)
struct sockaddr_in addr = { 0 };
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = inet_addr(kIP.c_str());
addr.sin_port = htons(kPort);
if (connect(socket_, reinterpret_cast<const sockaddr*>(&addr), sizeof(addr)) == SOCKET_ERROR) {
std::cout << "connect failed, GLE: " << WSAGetLastError() << std::endl;
break;
}
fd_set fd_read;
while (true) // TODO 未处理何时退出的问题
{
// (3)
FD_ZERO(&fd_read);
FD_SET(socket_, &fd_read);
// (4)
timeval timeout = { 3, 0 };
int ret = select(0, &fd_read, NULL, NULL, &timeout);
if (ret == SOCKET_ERROR) {
std::cout << "select failed, GLE: " << WSAGetLastError() << std::endl;
break;
}
// (5)
if (FD_ISSET(socket_, &fd_read)) {
char buf[100] = { 0 };
int err = recv(socket_, buf, 100, 0);
if (err > 0) {
std::cout << "recv: " << buf << std::endl;
}
else if (err == 0) {
std::cout << "connection closed." << std::endl;
break;
}
else {
std::cout << "recv failed, GLE: " << WSAGetLastError() << std::endl;
break;
}
}
} // while
} while (false);
// (6)
closesocket(socket_);
WSACleanup();
return 0;
}