PHP Socket 与 IO模型

莘欣怿
2023-12-01

Socket套接字是网络底层的核心,是TCP/IP以及UDP底层协议的实现通道。

IO模型

  • 阻塞/非阻塞

阻塞与非阻塞式针对IO过程中进程的状态而言的

阻塞IO是指调用结果返回之前当前线程会被挂起

非阻塞IO指的式在不能立即得到结果之前函数不会阻塞当前线程而会立即返回

  • 同步/异步

同步与异步式针对调用过程返回结果而言

同步是指在发出一个功能调用时在没有得到结果前调用不返回

异步指的是异步过程调用发出后调用者不能立即得到结果,实际处理调用的部件完成后通过状态、通知和回调来通知调用者。

  • 多路复用

为了提供数据在网络通信线路中传输的效率,在一条物理通信线路上建立多条逻辑通信信道,同时传输若干路信号的技术叫做多路复用技术。对于Socket来说能够同时处理多个连接的模型都被称为多路复用。目前常见的selectpollepollkqueue等IO模型。在这些多路复用的IO模型中,异步阻塞与异步非覅阻塞的扩展和性能最好。

同步阻塞IO模型

案例:客户端循环发送数据给服务器,最后发送结束服。服务端使用accept阻塞方式接收socket连接,然后循环接收到的数据,直到收到文件结束符才返回接收到的字节数。

服务器

$ vim server.php
<?php
set_time_limit(0);

class SocketServer
{
    private static $socket;

    public function __construct($host, $port)
    {
        global $errno, $errmsg;
        if($port < 1024)
        {
            die("port must be a number which bigger than 1024".PHP_EOL);
        }

        $addr = "tcp://{$host}:{$port}";
        $socket = stream_socket_server($addr, $errno, $errmsg);
        if(!$socket){
            die("error {$errno}: $errmsg".PHP_EOL);
        }
        
        //设置服务器socket超时
        while($connection = stream_socket_accept($socket, -1)){
            static $length = 0;

            //若没有读到结束符则继续读取
            $len = $length;
            $message = "";
            $buffer = "";
            while(!@preg_match('//r?/n/', $buffer)){
                $buffer = fread($connection, 1024);
                echo $buffer.PHP_EOL;
                if(!empty($buffer)){
                    $length += strlen($buffer);
                    $message .= @preg_replace('//r?/n/', "", $buffer);
                }
            }

            //读取数据大小
            $size = ($length - $len) * 8;
            fwrite($connection, "receive {$size} byte data/r/n");
            fclose($connection);
        }
        fclose($socket);
    }
}

$server = new SocketServer("0.0.0.0", 3000);
$ php server.php

客户端

$ vim client.php
<?php
function debug($msg){
    error_log($msg, 3, "/tmp/socket.log");
}

$arg = $argv[1];
if(empty($arg)){
    for($i=0; $i<10; $i++){
        $cmd = "php ".__FILE__." {$i}:test";
        system($cmd);
    }
    exit;
}

$addr = "tcp://0.0.0.0:3000";
$socket = stream_socket_client($addr, $errno, $ermsg, 30);
if(!$socket){
    die("error {$errno}: {$errmsg}");
}

$msg = trim($arg);
for($i=0; $i<10; $i++){
    fwrite($socket, $i.":".$msg);
    usleep(100000);
    $recv = fread($socket, 1024);//fread在阻塞模式下未督导数据是将会等待
    debug($recv);
}
fwrite($socket, "/r/n");//写入结束符

$data = fread($socket, 1024);
debug($data);
fclose($socket);
$ php client.php hello

问题缺陷:

使用telnet命令同时打开多个客户端时,会发现服务器一个时间只能处理一个客户端,其它需要在后面排队,这就是阻塞IO的特点,弱点很明显效率低下。

客户端在未发送文件结束符之前就向服务器索要返回数据,服务器由于未收到文件结束符也在向客户端索要 文件结束符,此时就形成了死锁。需要注意的是fread函数默认是阻塞的,如果没有设置超时很容易会出现死锁。

同步非阻塞IO模型select

IO模型:`select

实例:实现类似聊天室的功能,可以使用telnet连接和其它用户文字聊天,可以键入quit命令离开。客户端模拟一个登录用户连续发送10条消息然后退出。

服务器

$ vim server.php

使用多路复用实现非阻塞IO模型,服务器同时打开10个连接,同时进行模拟用户登录操作。

<?php
set_time_limit(0);

class Server
{
    private static $connections = [];
    private static $timeout = 60;
    private static $max_connect = 1024;
    private static $socket;

    public function __construct($port)
    {
        global $errno, $errmsg;
        if($port < 1024){
            exit("port must be a number which bigger than 1024".PHP_EOL);
        }

        $socket = socket_create_listen($port);
        if(!$socket){
            exit("socket listen {$port} falled".PHP_EOL);
        }

        //socket设置为非阻塞IO
        socket_set_nonblock($socket);

        while(true){
            $read = array_merge(self::$connections, [$socket]);
            $write= [];
            //选择一个连接获取读写连接通道
            if(socket_select($read, $write, $except=null, $tvsec=self::$timeout)){
                //如果是当前服务端监听的连接
                if(in_array($socket, $read)){
                    //接受客户端连接
                    $connect = socket_accept($socket);

                    $reject = "";
                    if(count(self::$connections) >= self::$max_connect){
                        $reject = "server full, try again later".PHP_EOL;
                    }
                    //将当前客户端连接放入socket_select
                    $id = (int)$connect;
                    self::$connections[$id] = $connect;
                    //输入连接资源缓存容器
                    $write[$id] = $connect;
                    //判断连接是否正常
                    self::debug("client {$id} come");
                    if(!empty($reject)){
                        socket_write($write[$id], $reject);
                        unset($write[$id]);
                        self::close($id);
                    }

                    $id = array_search($socket, $read);
                    unset($read[$id]);
                }
                //轮询读通道
                foreach($read as $fd){
                    $id = (int)$fd;
                    $line = @socket_read($fd, 2048, PHP_NORMAL_READ);
                    if($line === false){
                        self::debug("connection closed on socket {$id}");
                        self::close($id);
                        continue;
                    }

                    $tmp = substr($line, -1);
                    if($tmp!="/r" && $tmp!="/n"){
                        continue;
                    }

                    $line = trim($line);
                    if($line == "quit"){
                        self::debug("client {$id} quit");
                        self::close($id);
                        break;
                    }

                    self::debug("client {$id}: {$line}");
                }
                //轮询写通道
                foreach($write as $fd){
                    $id = (int)$fd;
                    $msg = "welcome to client {$id}".PHP_EOL;
                    socket_write($fd, $msg);
                }
            }
        }
    }
    public static function close($id)
    {
        $connect = self::$connections[$id];
        if(!empty($connect)){
            socket_shutdown($connect);
            socket_close($connect);
            unset(self::$connections[$id]);
        }
    }
    public static function debug($msg)
    {
        echo $msg.PHP_EOL;
        error_log($msg, 3, "/tmp/socket.log");
    }
}

$server = new Server(3000);

这个模型并没有真正实现异步,因为最终服务器程序还是要去通道中读取数据,得到 结果后同步返回给客户端。如果使用telnet命令同时打开多个客户端会发现,服务器可以同时处理这些连接,这就是非阻塞IO,当然要比古老的阻塞IO效率高很多,这仍会存在局限。

服务器提供两个设置参数

  • $timeout 表示是select的超时时间,过低会导致CPU负荷过高。
  • $max_connect 表示最大连接数,客户端超过服务器的最大连接数时会拒绝接收。

由于select是通过句柄来读写的,所以会收到系统默认参数_FD_SETSIZE的限制,一般默认为1024,修改的话需要重新编译内核。

通过测试发现select模式的性能会随着连接数的增加而线性降低,这也是select模型最大的问题所在,所以在高并发服务器下并不建议使用。

客户端

$ vim client.php
<?php
function debug($msg){
    echo $msg.PHP_EOL;
    error_log($msg, 3, "/tmp/socket.log");
}

$arg = $argv[1];
echo $arg.PHP_EOL;
if(empty($arg)){
    $arr = [];
    for($i=0; $i<10; $i++){
        $cmd = "php ".__FILE__." {$i}:test";
        debug($cmd);
        $arr[$i] = popen($cmd, "r");
    }
    foreach($arr as $item){
        pclose($item);
    }
    exit;
}

$addr = "tcp://0.0.0.0:3000";
$socket = stream_socket_client($addr, $errno, $ermsg, 30);
if(!$socket){
    die("error {$errno}: {$errmsg}");
}

$msg = trim($arg);
for($i=0; $i<10; $i++){
    fwrite($socket, $i.":".$msg);
    usleep(100000);
}
fwrite($socket, "quit/n");//写入结束符

$recv = fread($socket, 1024);
debug($recv);

fclose($socket);

运行测试

# 运行服务端
$ php server.php

# 运行客户端
$ php client.php

异步非阻塞IO模型epoll

IO模型:epoll

实例:基于PHP的libevent扩展实现,服务器接收客户端连接数据,然后返回接收的字节数返回给客户端。

安装配置PHP的libevent扩展

服务器

$ vim server.php
<?php
set_time_limit(0);

class Server
{
    private static $connections = [];
    private static $buffers = [];

    public function __construct($host="0.0.0.0", $port=3000)
    {
        global $errno, $errmsg;
        if($port < 1024){
            exit("port must be a number which bigger than 1024".PHP_EOL);
        }
        if(!extension_loaded("libevent")){
            exit("plaese install libevent extension firstly".PHP_EOL);
        }
        //创建socket服务器
        $addr = "tcp://{$host}:{$port}";
        $server = stream_socket_server($addr, $errno, $errmsg);
        if(!$server){
            exit("error {$errno} {$errmsg}".PHP_EOL);
        }
        //设置为非阻塞IO
        stream_set_blocking($server, 0);

        $base = event_base_new();
        $event = event_new();
        event_set(
            $event, 
            $server, 
            EV_READ|EV_PERSIST, 
            [__CLASS__, "accept"], 
            $base
        );
        event_base_set($event, $base);
        event_add($event);
        event_base_loop($base);
    }
    public function accept($socket, $flag, $base)
    {
        static $id = 0;

        $connect = stream_socket_accept($socket);
        stream_set_blocking($connect, 0);

        $id++;

        $buffer = event_buffer_new(
            $connect, 
            [__CLASS__, "read"], 
            [__CLASS__, "write"], 
            [__CLASS__, "error"], 
            $id
        );
        event_buffer_base_set($buffer, $base);
        event_buffer_timeout_set($buffer, 30, 30);
        event_buffer_watermark_set($buffer, EV_READ, 0, 0xfffff);
        event_buffer_priority_set($buffer, 10);
        event_buffer_enable($buffer, EV_READ | EV_PERSIST);

        self::$connections[$id] = $connect;
        self::$buffers[$id] = $buffer;
    }
    public function error($socket, $error, $id)
    {
        $connect = self::$connections[$id];
        $buffer = self::$buffers[$id];

        event_buffer_disable($buffer, EV_READ|EV_WRITE);
        event_buffer_free($buffer);

        fclose($connect);
        unset($buffer, $connect);
    }
    public function read($buffer, $id)
    {
        static $count = 0;

        $length = $count;
        $data = "";

        while($read = event_buffer_read($buffer, 1024)){
            $count += strlen($read);
            $data .= $read;
        }

        $size = ($count - $length) * 8;
        self::debug("client {$id}: ".__METHOD__.$data.PHP_EOL);

        $msg = "receive {$size} byte data /r/n";
        event_buffer_write($buffer, $msg);
    }

    public function write($buffer, $id)
    {
        $msg = "client {$id}:".__METHOD__.PHP_EOL;
        self::debug($msg);
    }

    public static function debug($msg)
    {
        echo $msg.PHP_EOL;
        error_log($msg, 3, "/tmp/socket.log");
    }
}

$server = new Server();

客户端

$ vim client.php
<?php
function debug($msg){
    echo $msg.PHP_EOL;
    error_log($msg, 3, "/tmp/socket.log");
}

$arg = $argv[1];
echo $arg.PHP_EOL;
if(empty($arg)){
    $arr = [];
    for($i=0; $i<10; $i++){
        $cmd = "php ".__FILE__." {$i}:test";
        debug($cmd);
        $arr[$i] = popen($cmd, "r");
    }
    foreach($arr as $item){
        pclose($item);
    }
    exit;
}

$addr = "tcp://0.0.0.0:3000";
$socket = stream_socket_client($addr, $errno, $ermsg, 30);
if(!$socket){
    die("error {$errno}: {$errmsg}");
}

$msg = trim($arg);
for($i=0; $i<10; $i++){
    fwrite($socket, $i.":".$msg);
    usleep(100000);
    debug(fread($socket, 1024));
}

fclose($socket);

epoll是为了解决poll天生的两个缺陷而生的,首先epoll模式没有限制,这也是基于epoll的Erlang服务器可以同时处理多并发连接的根本原因。另外,epoll模式的性能不会像select模式随着连接数的增加而性能下降。

# 查看Linux系统最大文件描述符数限制
$ cat /proc/sys/fs/file-max

epoll工作具有两种模式

  • LT(level triggered) 缺省模式,同时支持阻塞和非阻塞IO模式,虽然性能比后者查,但比较稳定。一般实际应用中,可使用此种模式,ET模式和WinSock都是纯异步非阻塞模式。
    lievent是在编译阶段选择系统的I/O demultiplex机制的,不支持在运行阶段根据配置再次素质呢。
  • ET(edge-triggered)
 类似资料: