当前位置: 首页 > 工具软件 > selecto > 使用案例 >

Windows套接字I/O模型(2) -- Select模型

萧浩漫
2023-12-01

一、Select模型介绍

套接字I/O Select模型的“中心思想”便是利用select函数,实现对I/O的管理。利用select函数判断套接字(一个或多个)上是否存在数据,或者能否向套接字写入数据。它也是同步的,也会阻塞。但和套接字I/O阻塞模型不同的是,Select模型可以同时管理多个Socket。

select函数原型:

int select (
  int nfds,                           
  fd_set FAR * readfds,               
  fd_set FAR * writefds,              
  fd_set FAR * exceptfds,             
  const struct timeval FAR * timeout  
);

第一个参数nfds会被忽略。之所以仍然要提供这个参数,只是为了保持与早期的Berkeley套接字应用程序的兼容。

三个fd_set参数:一个用于检查可读性(readfds),一个用于检查可写性(writefds),另一个用于例外数据( excepfds)。

从根本上说,fdset数据类型代表着一系列特定套接字的集合。
其中,readfds集合包括符合下述任何一个条件的套接字:
* 有数据可以读入。
* 连接已经关闭、重设或中止。(可以用来判断客户端或服务端程序是否退出了)
* 假如已调用了listen,而且一个连接正在建立,那么accept函数调用会成功。

writefds集合包括符合下述任何一个条件的套接字:
* 有数据可以发出。
* 如果已完成了对一个非锁定连接调用的处理,连接就会成功。

最后,exceptfds集合包括符合下述任何一个条件的套接字:
* 假如已完成了对一个非锁定连接调用的处理,连接尝试就会失败。
* 有带外(out-of-band,OOB)数据可供读取。

例如,假定我们想测试一个套接字是否“可读”,必须将自己的套接字增添到readfds集合,再等待select函数完成。select完成之后,必须判断自己的套接字是否仍为readfds集合的一部分。若答案是肯定的,便表明该套接字“可读”,可立即着手从它上面读取数据。在三个参数中(readfds、writedfss和exceptfds),任何两个都可以是空值(NULL);但是,至少有一个不能为空值!在任何不为空的集合中,必须包含至少一个套接字句柄;否则,select函数便没有任何东西可以等待。

最后一个参数timeout对应的是一个指针,它指向一个timeval结构,用于决定select最多等待I/O操作完成多久的时间。如timeout是一个空指针,那么select调用会无限期地“锁定”或停顿下去,直到至少有一个描述符符合指定的条件后结束。对timeval结构的定义如下:

struct timeval {
    long tv_sec;
    long tv_usec;
} ;

若将超时值设置为(0,0),表明select会立即返回,允许应用程序对select操作进行“轮询”。出于对性能方面的考虑,应避免这样的设置。
select成功完成后,会在fd_set结构中,返回刚好有未完成的I/O操作的所有套接字句柄的总量。
若超过timeval设定的时间,便会返回0。
不管由于什么原因,假如select调用失败,都会返回SOCKET_ERROR。
用select对套接字进行监视之前,在自己的应用程序中,必须将套接字句柄分配给一个集合,设置好一个或全部读、写以及例外fd_set结构。将一个套接字分配给任何一个集合后,再来调用select,便可知道一个套接字上是否正在发生上述的I/O活动。

Winsock提供了下列宏操作,对fd_set进行处理与检查:

FD_CLR(s, *set);      从set中删除套接字s。
FD_ISSET(s, *set);    检查s是否set集合的一名成员;返回TRUE或FALSE。
FD_SET(s, *set);      将套接字s加入集合set。
FD_ZERO(*set);        将set初始化成空集合。

二、示例

2.1 服务端

服务端代码比之前介绍的“套接字I/O 阻塞模型”中的示例稍微复杂一点。可以管理多个客户端的连接,对每个新的客户端发送“Hello,I’m server”问候信息,在客户端程序退出时自动关闭连接等功能。

#include <winsock2.h>
#include <iostream>
#include <assert.h>
#include <vector>

using namespace std;

#pragma comment(lib, "Ws2_32.lib")

const u_short kPort = 10001;
const std::string kHelloServer = "hello, I'm server.";

int main()
{
    WSADATA wsaData;
    WORD wVersionRequested = MAKEWORD(2, 2);
    WSAStartup(wVersionRequested, &wsaData);

    SOCKET socket_ = INVALID_SOCKET;
    std::vector<SOCKET> clients;

    do
    {
        // (1)
        socket_ = ::socket(AF_INET, SOCK_STREAM, 0);
        if (socket_ == INVALID_SOCKET) {
            std::cout << "create socket failed, GLE: " << WSAGetLastError() << std::endl;
            break;
        }

        // (2)
        struct sockaddr_in addr = { 0 };
        addr.sin_family = AF_INET;
        addr.sin_addr.s_addr = htonl(INADDR_ANY);
        addr.sin_port = htons(kPort);
        if (bind(socket_, reinterpret_cast<const sockaddr*>(&addr), sizeof(addr)) == SOCKET_ERROR) {
            std::cout << "bind failed, GLE: " << WSAGetLastError() << std::endl;
            break;
        }

        // (3)
        if (listen(socket_, 5) == SOCKET_ERROR) {
            std::cout << "listen failed, GLE: " << WSAGetLastError() << std::endl;
            break;
        }
        std::cout << "listen on port: " << kPort << std::endl;


        fd_set fd_read;
        while (true) // TODO 未处理何时退出的问题
        {
            // (4)
            FD_ZERO(&fd_read);

            FD_SET(socket_, &fd_read);

            for (std::vector<SOCKET>::iterator it = clients.begin(); it != clients.end(); ++it)
                FD_SET(*it, &fd_read);

            // (5)
            timeval timeout = { 3, 0 };
            int ret = select(0, &fd_read, NULL, NULL, &timeout);
            if (ret == SOCKET_ERROR) {
                std::cout << "select failed, GLE: " << WSAGetLastError() << std::endl;
                break;
            }

            // (6.1)
            // 检查服务端的监听socket,是否有新的连接被搁置
            if (FD_ISSET(socket_, &fd_read)) {

                // (7.1)
                struct sockaddr_in addr_c = { 0 };
                int addr_len = sizeof(addr_c);
                SOCKET s = accept(socket_, reinterpret_cast<sockaddr*>(&addr_c), &addr_len);
                if (s == SOCKET_ERROR) {
                    std::cout << "accept failed, GLE: " << WSAGetLastError() << std::endl;
                }
                else {
                    clients.push_back(s);
                    std::cout << "new connection" << std::endl;

                    int left = kHelloServer.length();
                    int idx = 0;
                    while (left > 0) {
                        int err = send(s, (const char*)(kHelloServer.c_str() + idx), left, 0);
                        if (err == SOCKET_ERROR) {
                            std::cout << "send failed, GLE: " << WSAGetLastError() << std::endl;
                            break;
                        }

                        left -= err;
                        idx += err;

                        std::cout << "bytes sent: " << err << std::endl;
                    }
                }
            }

            // (6.2)
            // 检查与客户端连接的socket,是否有数据可以读入
            for (std::vector<SOCKET>::iterator it = clients.begin(); it != clients.end(); ) {
                if (FD_ISSET(*it, &fd_read)) {

                    // (7.2)
                    char buf[100] = { 0 };
                    int err = recv(*it, buf, 100, 0);
                    if (err > 0) {
                        std::cout << "recv: " << buf << std::endl;

                        ++it;
                    }
                    else if (err == 0) {
                        std::cout << "connection closed." << std::endl;
                        closesocket(*it);

                        it = clients.erase(it);
                    }
                    else {
                        std::cout << "recv failed, GLE: " << WSAGetLastError() << std::endl;
                        closesocket(*it);

                        it = clients.erase(it);
                    }
                }
                else {
                    ++it;
                }
            }
        } // while
    } while (false);

    // (8)
    for (std::vector<SOCKET>::iterator it = clients.begin(); it != clients.end(); ++it) {
        closesocket(*it);
    }

    // (9)
    closesocket(socket_);

    WSACleanup();
    return 0;
}

2.2 客户端

客户端在连接上服务端之后,不间断的接收服务端的消息。

#include <winsock2.h>
#include <iostream>
#include <assert.h>
#include <vector>

using namespace std;

#pragma comment(lib, "Ws2_32.lib")

const std::string kIP = "127.0.0.1";
const u_short kPort = 10001;
const std::string kHelloClient = "hello, I'm client.";

int main()
{
    WSADATA wsaData;
    WORD wVersionRequested = MAKEWORD(2, 2);
    WSAStartup(wVersionRequested, &wsaData);

    SOCKET socket_ = INVALID_SOCKET;

    do
    {
        // (1)
        socket_ = ::socket(AF_INET, SOCK_STREAM, 0);
        if (socket_ == INVALID_SOCKET) {
            std::cout << "create socket failed, GLE: " << WSAGetLastError() << std::endl;
            break;
        }


        // (2)
        struct sockaddr_in addr = { 0 };
        addr.sin_family = AF_INET;
        addr.sin_addr.s_addr = inet_addr(kIP.c_str());
        addr.sin_port = htons(kPort);
        if (connect(socket_, reinterpret_cast<const sockaddr*>(&addr), sizeof(addr)) == SOCKET_ERROR) {
            std::cout << "connect failed, GLE: " << WSAGetLastError() << std::endl;
            break;
        }


        fd_set fd_read;
        while (true)  // TODO 未处理何时退出的问题
        {
            // (3)
            FD_ZERO(&fd_read);
            FD_SET(socket_, &fd_read);

            // (4)
            timeval timeout = { 3, 0 };
            int ret = select(0, &fd_read, NULL, NULL, &timeout);
            if (ret == SOCKET_ERROR) {
                std::cout << "select failed, GLE: " << WSAGetLastError() << std::endl;
                break;
            }

            // (5)
            if (FD_ISSET(socket_, &fd_read)) {
                char buf[100] = { 0 };
                int err = recv(socket_, buf, 100, 0);
                if (err > 0) {
                    std::cout << "recv: " << buf << std::endl;
                }
                else if (err == 0) {
                    std::cout << "connection closed." << std::endl;
                    break;
                }
                else {
                    std::cout << "recv failed, GLE: " << WSAGetLastError() << std::endl;
                    break;
                }
            }
        } // while
    } while (false);


    // (6)
    closesocket(socket_);

    WSACleanup();
    return 0;
}
 类似资料: