首先说明一下Workerman究竟是什么东西:
WebSocket 是 HTML5 提供的一种网络通讯协议,用于服务端与客户端实时数据传输。广泛用于浏览器与服务器的实时通讯,APP与服务器的实时通讯等场景,相比传统HTTP协议请求响应式通讯,WebSocket协议可以做到实时的双向通讯,服务端可以在任何时候向客户端推送数据(HTTP协议需要客户端发起请求后才能推送),就是传统说的长链接(指在一个连接上可以连续发送多个数据包)
workman特点:
1.Workerman类似一个PHP版本的nginx
2.核心也是多进程+Epoll+非阻塞IO
3.Workerman每个进程能维持上万并发连接
4.由于本身常驻内存,不依赖Apache、nginx、php-fpm这些容器,拥有超高的性能
5.同时支持TCP、UDP、UNIXSOCKET,支持长连接,支持Websocket、HTTP、WSS、HTTPS等通讯协议以及各种自定义协议。
6.拥有定时器、异步socket客户端、异步Redis、异步Http、异步消息队列等众多高性能组件。
7.主进程为了保持稳定性,只负责监控子进程,不负责接收数据也不做任何业务逻辑
8.纯PHP开发
9.支持高并发:WorkerMan支持Libevent事件轮询库(需要安装event扩展),如果没有安装Event扩展则使用PHP内置的Select相关系统调用
相关命令:
启动命令:php php文件名 start
停止命令:php php文件名 stop
重启命令:PHP php文件名 restart
查看状态:PHP php文件名 status
链接状态:PHP php文件名 connections
注意点:
心跳是必须的:一定要有(非常重要):否则连接可能由于长时间不活跃而被路由节点防火墙断开 客户端和服务端协议一定要对应才能通讯,那么心跳断了该怎么办?这里就需要一个机制:断线重连。
心跳断开的原因:
1.浏览器最小化js被暂停
2.浏览器切换到其它tab页面js被暂停
3.电脑进入睡眠等等
4.移动端切换网络
5.信号变弱、手机黑屏、手机应用切换到后台、路由故障、业务主动断开
解决办法:
断线重连只能客户端做,服务端无法实现
例如浏览器websocket需要监听onclose事件,当发生onclose时建立新的连接(为避免需崩可延建立连接)。更严格一点,服务端也应该定时发起心跳数据,并且客户端需要定时监测服务端的心跳数据是否超时,超过规定时间未收到服务端心跳数据应该认定连接已经断开,需要执行close关闭连接,并重新建立新的连接
支持协议的示例:
websocket $websocket_worker = new Worker('websocket://0.0.0.0:2345'); text $text_worker = new Worker('text://0.0.0.0:2346'); frame $frame_worker = new Worker('frame://0.0.0.0:2347'); tcp $tcp_worker = new Worker('tcp://0.0.0.0:2348'); udp $udp_worker = new Worker('udp://0.0.0.0:2349'); unix $unix_worker = new Worker('unix:///tmp/wm.sock');
获取包:
composer config -g --unset repos.packagist
composer create-project workerman/webmanw
目录结构: ├── app 应用目录 │ ├── controller 控制器目录 │ ├── model 模型目录 │ ├── view 视图目录 │ └── middleware 中间件目录 │ └── StaticFile.php 自带静态文件中间件 | |—— functions.php 自定义函数 ├── config 配置目录 │ ├── app.php 应用配置 │ ├── autoload.php 这里配置的文件会被自动加载 │ ├── bootstrap.php 进程启动时onWorkerStart时运行的回调配置 │ ├── container.php 容器配置 │ ├── dependence.php 容器依赖配置 │ ├── database.php 数据库配置 │ ├── exception.php 异常配置 │ ├── log.php 日志配置 │ ├── middleware.php 中间件配置 │ ├── process.php 自定义进程配置 │ ├── redis.php redis配置 │ ├── route.php 路由配置 │ ├── server.php 端口、进程数等服务器配置 │ ├── view.php 视图配置 │ ├── static.php 静态文件开关及静态文件中间件配置 │ ├── translation.php 多语言配置 │ └── session.php session配置 ├── public 静态资源目录 ├── process 自定义进程目录 ├── runtime 应用的运行时目录,需要可写权限 ├── start.php 服务启动文件 ├── vendor composer安装的第三方类库目录 └── support 类库适配(包括第三方类库) ├── Request.php 请求类 ├── Response.php 响应类 ├── Plugin.php 插件安装卸载脚本 ├── helpers.php 助手函数 └── bootstrap.php 进程启动后初始化脚本 原理(这里是精髓,请逐行阅读,这里面的几个函数一定要对照手册仔细研究): class worker { //连接事件回调 public $onConnect =null; //消息事件回调 public $onCMessage =null; //连接关闭事件回调 public $onClose =null; //所有socket,包括监听的所有socket public $allsockets =array(); //构造函数 function __construct($address) { //创建监听socket $this->socket = stream_socket_server($address,$errno,$errstr); //设置为非阻塞 stream_set_blocking($this->socket,0); //把监听的socet放入allsockets $this->allsockets[(int)$this->socket]=$this->socket; } //运行 public function run(){ while (1){ //这里不监听socket可写事件和数据可读事件 $write = $except = null; //监听所有的socket可读事件,包括客户端socket和监听端口的socket $read = $this->allsockets; //整个程序阻塞在这里,等待$read里面的socket可读,这里$read是个可读参数 stream_select($read,$write,$except); //$read被重新赋值,遍历所有状态为可读的socket foreach ($read as $index=>$socket){ //如果监听的是可读的socket,说明有新的连接 if($socket ===$this->socket){ //通过stream_socket_accept获取新的连接 $new_connect_socket = stream_socket_accept($this->socket); if(!$new_connect_socket);continue; //如果有onConnect事件回调,则尝试触发 if($this->onConnect){ call_user_func($this->onConnect,$new_connect_socket); } //将新的客户端新连接的socket放到allsockets中,stream_select监听可读事件 $this->allsockets[(int)$new_connect_socket] = $new_connect_socket; }else{ //读数据 $buffer = fread($socket,65535); //数据为空,代表连接已断开 if($buffer === '' || $buffer ===false){ //尝试触发onClose回调 if($this->onClose){ call_user_func($this->onClose,$socket); } fclose($socket); //从allsockets中删除对应的连接 unset($this->allsockets[(int)$socket]); continue; } //尝试触发onMessage回调 call_user_func($this->onCMessage); } } } } } $server = new worker("tcp://0.0.0.0:12115"); $server->onConnect = function ($conn){ echo 'onConnect\n'; }; $server->onCMessage = function ($conn,$msg){ fwrite($conn,"HTTP/1.1 200 OK\r\n Connection:Keep-alive\r\nServer:workman\1.1.4\r\nContent-length:5\r\n\rhellow"); }; $server->onClose = function ($conn){ echo 'onCLose\n'; }; $server->run();
例子:
发送端:
<!doctype html>
<html lang="zh-cn">
<head>
<meta charset="utf-8">
<meta http-equiv="X-UA-Compatible" content="IE=edge">
<meta name="viewport" content="width=device-width, initial-scale=1, shrink-to-fit=no">
<title>WebSocket大屏</title>
<script src="jquery.min.js"></script>
</head>
<body>
<input type="text" id="content">
<input type="button" value="发送" οnclick="send()">
<script>
function connect() {
ws = new WebSocket('ws://127.0.0.1:6161');
ws.onmessage = function (e) {
console.log(e.data);
};
ws.timer = setInterval(function () {
ws.send('ping');
}, 50000);
ws.onclose = function () {
clearTimeout(ws.timer);
setTimeout(connect, 1000);
};
}
// 通过WebSocket连接将数据发送给服务端
function send() {
ws.send($('#content').val());
$('#content').val('');
}
connect();
</script>
</body>
</html>
接收端:
<!doctype html>
<html lang="zh-cn">
<head>
<meta charset="utf-8">
<meta http-equiv="X-UA-Compatible" content="IE=edge">
<meta name="viewport" content="width=device-width, initial-scale=1, shrink-to-fit=no">
<script src="jquery.min.js"></script>
<title>WebSocket大屏</title>
</head>
<body>
<ul id="content">
</ul>
</body>
<script>
function connect() {
// 与服务端建立WebSocket连接
//(为了方便测试这里ip使用的是127.0.0.1,正式环境请使用外网ip)
ws = new WebSocket('ws://127.0.0.1:6161');
// 连接建立后发送daping,表明自己是电脑浏览器
ws.onopen = function() {
ws.send('daping');
};
// 收到服务端推送的数据后,将数据显示在浏览器里(心跳数据pong除外)
ws.onmessage = function (e) {
if (e.data !== 'pong') {
$($('#content')).append('<li>'+e.data+'</li>');
}
};
// 没隔50秒发送一个心跳数据 ping 给服务器,保持连接
ws.timer = setInterval(function () {
ws.send('ping');
}, 50000);
// 当连接关闭时清除定时器,并设置1秒后重连
ws.onclose = function () {
clearTimeout(ws.timer);
setTimeout(connect, 1000);
};
}
// 执行连接
connect();
</script>
</html>
服务文件:
<?php
require 'autoload.php';
use Workerman\Worker;
use Workerman\Connection\TcpConnection;
// 使用websocket协议监听6161端口
$worker = new Worker('websocket://0.0.0.0:6161');
// 当浏览器(包括用户手机浏览器和电脑浏览器)发来消息时的处理逻辑
$worker->onMessage = function(TcpConnection $connection, $data) {
// 这个静态变量用来存储电脑浏览器的websocket连接,方便推送使用
static $daping_connection = null;
switch ($data) {
// 发送 daping 字符串的是电脑浏览器,将其连接保存到静态变量中
case 'daping':
$daping_connection = $connection;
break;
// ping 是心跳数据,用来维持连接,只返回 pong 字符串,无需做其它处理
case 'ping':
$connection->send('pong');
break;
// 用户手机浏览器发来的祝福语
default:
// 直接使用电脑浏览器的连接将祝福语推送给电脑
if ($daping_connection) {
$daping_connection->send($data);
}
}
};
Worker::runAll();
环境已经搭建完成,还需要了解一下后续的属性,回调事件,接口
属性:
1.Id:当前worker进程的id编号
2.Count:设置当前Worker实例启动多少个进程
3.Name:设置当前Worker实例的名称
4.protocol:设置当前Worker实例的协议类
5.transport:设置当前Worker实例所使用的传输层协议
6.reusePort:设置当前worker是否开启监听端口复用(socket的SO_REUSEPORT选项)。
7.connections:存储了当前进程的所有的客户端连接对象,其中id为connection的id编号
8.logFile:指定日志文件路径
9. user:设置当前Worker实例以哪个用户运行
10. reloadable:设置当前Worker实例是否可以reload,即收到reload信号后是否退出重启
11.daemonize:是否以daemon(守护进程)方式运行
回调属性:
1.onWorkerStart:设置Worker子进程启动时的回调函数,每个子进程启动时都会执行。
2.onWorkerReload:设置Worker收到reload信号后执行的回调
3.onConnect:当客户端与Workerman建立连接时(TCP三次握手完成后)触发的回调函数。每个连接只会触发一次onConnect回调
4.onMessage:当客户端通过连接发来数据时(Workerman收到数据时)触发的回调函数
5.onClose:当客户端连接与Workerman断开时触发的回调函数。不管连接是如何断开的,只要断开就会触发onClose。每个连接只会触发一次
6.onBufferFull:每个连接都有一个单独的应用层发送缓冲区,如果客户端接收速度小于服务端发送速度,数据会在应用层缓冲区暂存,如果缓冲区满则会触发onBufferFull回调
7.onBufferDrain:每个连接都有一个单独的应用层发送缓冲区,缓冲区大小由TcpConnection::$maxSendBufferSize决定,默认值为1MB,可以手动设置更改大小,更改后会对所有连接生效。
8.onError:当客户端的连接上发生错误时触发
接口:
1.Send:向客户端发送数据
2.getRemoteIp:获得该连接的客户端ip
3.getRemotePort:获得该连接的客户端端口
4.Close:调用close会等待发送缓冲区的数据发送完毕后才关闭连接,并触发连接的onClose回调
5.Destroy:调用destroy后即使该连接的发送缓冲区还有数据未发送到对端,连接也会立刻被关闭,并立刻触发该连接的onClose回调
6.pauseRecv:使当前连接停止接收数据。该连接的onMessage回调将不会被触发。此方法对于上传流量控制非常有用
7.resumeRecv:使当前连接继续接收数据。此方法与Connection::pauseRecv配合使用,对于上传流量控制非常有用
8.Pipe:将当前连接的数据流导入到目标连接。内置了流量控制。此方法做TCP代理非常有用
文章还未写完,后续会继续续写跟新