Workerman 原理解析概述

司寇星海
2023-12-01

首先说明一下Workerman究竟是什么东西: 

        WebSocket 是 HTML5 提供的一种网络通讯协议,用于服务端与客户端实时数据传输。广泛用于浏览器与服务器的实时通讯,APP与服务器的实时通讯等场景,相比传统HTTP协议请求响应式通讯,WebSocket协议可以做到实时的双向通讯,服务端可以在任何时候向客户端推送数据(HTTP协议需要客户端发起请求后才能推送),就是传统说的长链接(指在一个连接上可以连续发送多个数据包)

workman特点:

        1.Workerman类似一个PHP版本的nginx

        2.核心也是多进程+Epoll+非阻塞IO

        3.Workerman每个进程能维持上万并发连接

         4.由于本身常驻内存,不依赖Apache、nginx、php-fpm这些容器,拥有超高的性能

        5.同时支持TCP、UDP、UNIXSOCKET,支持长连接,支持Websocket、HTTP、WSS、HTTPS等通讯协议以及各种自定义协议。

        6.拥有定时器、异步socket客户端、异步Redis、异步Http、异步消息队列等众多高性能组件。

        7.主进程为了保持稳定性,只负责监控子进程,不负责接收数据也不做任何业务逻辑

        8.纯PHP开发

        9.支持高并发:WorkerMan支持Libevent事件轮询库(需要安装event扩展),如果没有安装Event扩展则使用PHP内置的Select相关系统调用

相关命令:

        启动命令:php php文件名 start

        停止命令:php php文件名 stop

        重启命令:PHP php文件名 restart

        查看状态:PHP php文件名 status

        链接状态:PHP php文件名 connections

注意点:

        心跳是必须的:一定要有(非常重要):否则连接可能由于长时间不活跃而被路由节点防火墙断开 客户端和服务端协议一定要对应才能通讯,那么心跳断了该怎么办?这里就需要一个机制:断线重连。

        心跳断开的原因:

        1.浏览器最小化js被暂停

        2.浏览器切换到其它tab页面js被暂停

        3.电脑进入睡眠等等

        4.移动端切换网络

        5.信号变弱、手机黑屏、手机应用切换到后台、路由故障、业务主动断开

        解决办法:

        断线重连只能客户端做,服务端无法实现

        例如浏览器websocket需要监听onclose事件,当发生onclose时建立新的连接(为避免需崩可延建立连接)。更严格一点,服务端也应该定时发起心跳数据,并且客户端需要定时监测服务端的心跳数据是否超时,超过规定时间未收到服务端心跳数据应该认定连接已经断开,需要执行close关闭连接,并重新建立新的连接

支持协议的示例:

websocket
$websocket_worker = new Worker('websocket://0.0.0.0:2345');

text
$text_worker = new Worker('text://0.0.0.0:2346');

frame
$frame_worker = new Worker('frame://0.0.0.0:2347');

tcp
$tcp_worker = new Worker('tcp://0.0.0.0:2348');

udp
$udp_worker = new Worker('udp://0.0.0.0:2349');

unix
$unix_worker = new Worker('unix:///tmp/wm.sock');

获取包:

composer config -g --unset repos.packagist

composer create-project workerman/webmanw

目录结构:

├── app 应用目录 
│   ├── controller 控制器目录 
│   ├── model 模型目录 
│   ├── view 视图目录 
│   └── middleware 中间件目录 
│        └── StaticFile.php 自带静态文件中间件 
|   |—— functions.php 自定义函数 

├── config 配置目录
│   ├── app.php 应用配置 
│   ├── autoload.php 这里配置的文件会被自动加载 
│   ├── bootstrap.php 进程启动时onWorkerStart时运行的回调配置 
│   ├── container.php 容器配置
│   ├── dependence.php 容器依赖配置 
│   ├── database.php 数据库配置 
│   ├── exception.php 异常配置 
│   ├── log.php 日志配置 
│   ├── middleware.php 中间件配置 
│   ├── process.php 自定义进程配置 
│   ├── redis.php redis配置 
│   ├── route.php 路由配置 
│   ├── server.php 端口、进程数等服务器配置 
│   ├── view.php 视图配置 
│   ├── static.php 静态文件开关及静态文件中间件配置 
│   ├── translation.php 多语言配置 
│   └── session.php session配置 
├── public 静态资源目录 
├── process 自定义进程目录 
├── runtime 应用的运行时目录,需要可写权限 
├── start.php 服务启动文件 
├── vendor composer安装的第三方类库目录 
└── support 类库适配(包括第三方类库) 
     ├── Request.php 请求类 
     ├── Response.php 响应类 
     ├── Plugin.php 插件安装卸载脚本 
     ├── helpers.php 助手函数 
     └── bootstrap.php 进程启动后初始化脚本


原理(这里是精髓,请逐行阅读,这里面的几个函数一定要对照手册仔细研究):
class worker
{
    //连接事件回调
    public $onConnect =null;
    //消息事件回调
    public $onCMessage =null;
    //连接关闭事件回调
    public $onClose =null;
    //所有socket,包括监听的所有socket
    public $allsockets =array();

    //构造函数
    function __construct($address)
    {
        //创建监听socket
        $this->socket = stream_socket_server($address,$errno,$errstr);
        //设置为非阻塞
        stream_set_blocking($this->socket,0);
        //把监听的socet放入allsockets
        $this->allsockets[(int)$this->socket]=$this->socket;
    }

    //运行
    public function run(){
        while (1){
            //这里不监听socket可写事件和数据可读事件
            $write = $except = null;
            //监听所有的socket可读事件,包括客户端socket和监听端口的socket
            $read = $this->allsockets;
            //整个程序阻塞在这里,等待$read里面的socket可读,这里$read是个可读参数
            stream_select($read,$write,$except);
            //$read被重新赋值,遍历所有状态为可读的socket
            foreach ($read as $index=>$socket){
                //如果监听的是可读的socket,说明有新的连接
                if($socket ===$this->socket){
                    //通过stream_socket_accept获取新的连接
                    $new_connect_socket = stream_socket_accept($this->socket);
                    if(!$new_connect_socket);continue;
                    //如果有onConnect事件回调,则尝试触发
                    if($this->onConnect){
                        call_user_func($this->onConnect,$new_connect_socket);
                    }
                   //将新的客户端新连接的socket放到allsockets中,stream_select监听可读事件
                    $this->allsockets[(int)$new_connect_socket] = $new_connect_socket;
                }else{
                    //读数据
                    $buffer = fread($socket,65535);
                    //数据为空,代表连接已断开
                    if($buffer === '' || $buffer ===false){
                        //尝试触发onClose回调
                        if($this->onClose){
                            call_user_func($this->onClose,$socket);
                        }
                        fclose($socket);
                        //从allsockets中删除对应的连接
                        unset($this->allsockets[(int)$socket]);
                        continue;
                    }
                    //尝试触发onMessage回调
                    call_user_func($this->onCMessage);
                }
            }
        }
    }
}
$server = new worker("tcp://0.0.0.0:12115");

$server->onConnect = function ($conn){
    echo 'onConnect\n';
};

$server->onCMessage = function ($conn,$msg){
    fwrite($conn,"HTTP/1.1 200 OK\r\n Connection:Keep-alive\r\nServer:workman\1.1.4\r\nContent-length:5\r\n\rhellow");
};

$server->onClose = function ($conn){
    echo 'onCLose\n';
};

$server->run();

例子

发送端:

<!doctype html>
<html lang="zh-cn">
<head>
    <meta charset="utf-8">
    <meta http-equiv="X-UA-Compatible" content="IE=edge">
    <meta name="viewport" content="width=device-width, initial-scale=1, shrink-to-fit=no">
    <title>WebSocket大屏</title>
    <script src="jquery.min.js"></script>
</head>
<body>

<input type="text" id="content">
<input type="button" value="发送" οnclick="send()">
<script>

function connect() {
    ws = new WebSocket('ws://127.0.0.1:6161');
    ws.onmessage = function (e) {
        console.log(e.data);
    };
    ws.timer = setInterval(function () {
        ws.send('ping');
    }, 50000);
    ws.onclose = function () {
        clearTimeout(ws.timer);
        setTimeout(connect, 1000);
    };
}

//  通过WebSocket连接将数据发送给服务端
function send() {
    ws.send($('#content').val());
    $('#content').val('');
}

connect();
</script>
</body>
</html>

接收端:
 

<!doctype html>
<html lang="zh-cn">
<head>
    <meta charset="utf-8">
    <meta http-equiv="X-UA-Compatible" content="IE=edge">
    <meta name="viewport" content="width=device-width, initial-scale=1, shrink-to-fit=no">
    <script src="jquery.min.js"></script>
    <title>WebSocket大屏</title>
</head>
<body>
    <ul id="content">

    </ul>
</body>
<script>
    function connect() {
        // 与服务端建立WebSocket连接
        //(为了方便测试这里ip使用的是127.0.0.1,正式环境请使用外网ip)
        ws = new WebSocket('ws://127.0.0.1:6161');
        // 连接建立后发送daping,表明自己是电脑浏览器
        ws.onopen = function() {
            ws.send('daping');
        };
        //  收到服务端推送的数据后,将数据显示在浏览器里(心跳数据pong除外)
        ws.onmessage = function (e) {
            if (e.data !== 'pong') {
                $($('#content')).append('<li>'+e.data+'</li>');
            }
        };
        // 没隔50秒发送一个心跳数据 ping 给服务器,保持连接
        ws.timer = setInterval(function () {
            ws.send('ping');
        }, 50000);
        //  当连接关闭时清除定时器,并设置1秒后重连
        ws.onclose = function () {
            clearTimeout(ws.timer);
            setTimeout(connect, 1000);
        };
    }
    // 执行连接
    connect();
</script>
</html>

服务文件:
 

<?php

require 'autoload.php';
use Workerman\Worker;
use Workerman\Connection\TcpConnection;

// 使用websocket协议监听6161端口
$worker = new Worker('websocket://0.0.0.0:6161');

//  当浏览器(包括用户手机浏览器和电脑浏览器)发来消息时的处理逻辑
$worker->onMessage = function(TcpConnection $connection, $data) {
    // 这个静态变量用来存储电脑浏览器的websocket连接,方便推送使用
    static $daping_connection = null;
    switch ($data) {
        // 发送 daping 字符串的是电脑浏览器,将其连接保存到静态变量中
        case 'daping':
            $daping_connection = $connection;
            break;
        // ping 是心跳数据,用来维持连接,只返回 pong 字符串,无需做其它处理
        case 'ping':
            $connection->send('pong');
            break;
        // 用户手机浏览器发来的祝福语
        default:
            // 直接使用电脑浏览器的连接将祝福语推送给电脑
            if ($daping_connection) {
                $daping_connection->send($data);
            }
    }
};
Worker::runAll();

环境已经搭建完成,还需要了解一下后续的属性回调事件,接口

属性:

1.Id:当前worker进程的id编号

2.Count:设置当前Worker实例启动多少个进程

3.Name:设置当前Worker实例的名称

4.protocol:设置当前Worker实例的协议类

5.transport:设置当前Worker实例所使用的传输层协议

6.reusePort:设置当前worker是否开启监听端口复用(socket的SO_REUSEPORT选项)。

7.connections:存储了当前进程的所有的客户端连接对象,其中id为connection的id编号

8.logFile:指定日志文件路径

9. user:设置当前Worker实例以哪个用户运行

10. reloadable:设置当前Worker实例是否可以reload,即收到reload信号后是否退出重启

11.daemonize:是否以daemon(守护进程)方式运行

回调属性:

1.onWorkerStart:设置Worker子进程启动时的回调函数,每个子进程启动时都会执行。

2.onWorkerReload设置Worker收到reload信号后执行的回调

3.onConnect:当客户端与Workerman建立连接时(TCP三次握手完成后)触发的回调函数。每个连接只会触发一次onConnect回调

4.onMessage当客户端通过连接发来数据时(Workerman收到数据时)触发的回调函数

5.onClose当客户端连接与Workerman断开时触发的回调函数。不管连接是如何断开的,只要断开就会触发onClose。每个连接只会触发一次

6.onBufferFull每个连接都有一个单独的应用层发送缓冲区,如果客户端接收速度小于服务端发送速度,数据会在应用层缓冲区暂存,如果缓冲区满则会触发onBufferFull回调

7.onBufferDrain每个连接都有一个单独的应用层发送缓冲区,缓冲区大小由TcpConnection::$maxSendBufferSize决定,默认值为1MB,可以手动设置更改大小,更改后会对所有连接生效。

8.onError当客户端的连接上发生错误时触发

接口:

1.Send:向客户端发送数据

2.getRemoteIp:获得该连接的客户端ip

3.getRemotePort:获得该连接的客户端端口

4.Close:调用close会等待发送缓冲区的数据发送完毕后才关闭连接,并触发连接的onClose回调

5.Destroy:调用destroy后即使该连接的发送缓冲区还有数据未发送到对端,连接也会立刻被关闭,并立刻触发该连接的onClose回调

6.pauseRecv:使当前连接停止接收数据。该连接的onMessage回调将不会被触发。此方法对于上传流量控制非常有用

7.resumeRecv:使当前连接继续接收数据。此方法与Connection::pauseRecv配合使用,对于上传流量控制非常有用

8.Pipe:将当前连接的数据流导入到目标连接。内置了流量控制。此方法做TCP代理非常有用

文章还未写完,后续会继续续写跟新

 类似资料: