内部进程间通讯
优质
小牛编辑
135浏览
2023-12-01
为了方便进程内部通讯,进行数据交换。imi v1.2.0 版本新增了内部进程间通讯封装。
使用 Swoole 提供的 sendMessage() 和 onPipeMessage 事件 实现。
在 onPipeMessage
事件中,收到指定结构的数据,就会触发相应事件。
我们只需要监听事件就行了。
介绍
数据结构
[
'action' => '动作名', // 此字段固定
// 其它参数任意增加即可
]
事件名称
IMI.PIPE_MESSAGE.动作名
代码示例
发送并监听
发送:
use Imi\Server\Server;
// 发送给所有 Worker 进程
Server::sendMessage('test', [
'time' => time(),
]);
// 发送给 WorkerId 为 1 的进程
Server::sendMessage('test', [
'time' => time(),
], 1);
监听:
事件名称为:IMI.PIPE_MESSAGE.test
<?php
namespace App\Listener;
use Imi\Event\EventParam;
use Imi\Event\IEventListener;
use Imi\Bean\Annotation\Listener;
/**
* @Listener("IMI.PIPE_MESSAGE.test")
*/
class TestMessage implements IEventListener
{
/**
* 事件处理方法
* @param EventParam $e
* @return void
*/
public function handle(EventParam $e)
{
$data = $e->getData()['data'];
var_dump($data['time']); // 接收到了上面发送来的 time
}
}
发送并获取返回数据
思路:
一般来讲,发送消息不像 http 请求,一定会有响应结果。
但有时候,我们需要获取返回数据。
办法很简单,比如:发送数据动作名为 testRequest
,再定义一个 testResponse
动作监听用于接收数据即可。
再使用 Channel
挂起协程等待响应结果,完美!
发送请求并等待响应:
use Imi\Server\Server;
use Imi\Util\Co\ChannelContainer;
// 生成一个随机ID
$id = uniqid('', true);
try {
$channel = ChannelContainer::getChannel($id);
// 发送给 WorkerId 为 1 的进程
Server::sendMessage('testRequest', [
'time' => time(),
], 1);
// 通过 Channel 获取结果,超时时间可以自行设置,这里是 30 秒
$result = $channel->pop(30);
if(false === $result)
{
throw new \RuntimeException('Receive error');
}
var_dump($result['datetime']); // 返回结果
} finally {
ChannelContainer::removeChannel($id);
}
监听请求:
事件名称为:IMI.PIPE_MESSAGE.testRequest
<?php
namespace App\Listener;
use Imi\Event\EventParam;
use Imi\Event\IEventListener;
use Imi\Bean\Annotation\Listener;
use Imi\Server\Server;
/**
* @Listener("IMI.PIPE_MESSAGE.testRequest")
*/
class TestRequestMessage implements IEventListener
{
/**
* 事件处理方法
* @param EventParam $e
* @return void
*/
public function handle(EventParam $e)
{
$data = $e->getData()['data'];
$datetime = date('Y-m-d H:i:s', $data['time']);
// 发送响应结果
Server::sendMessage('testResponse', [
'messageId' => $data['messageId'];
'datetime' => $datetime,
], $e->getData()['workerId']);
}
}
监听响应:
事件名称为:IMI.PIPE_MESSAGE.testResponse
<?php
namespace App\Listener;
use Imi\Event\EventParam;
use Imi\Event\IEventListener;
use Imi\Bean\Annotation\Listener;
use Imi\Server\Server;
use Imi\Util\Co\ChannelContainer;
/**
* @Listener("IMI.PIPE_MESSAGE.testResponse")
*/
class TestResponseMessage implements IEventListener
{
/**
* 事件处理方法
* @param EventParam $e
* @return void
*/
public function handle(EventParam $e)
{
$data = $e->getData()['data'];
if(ChannelContainer::hasChannel($data['messageId']))
{
// 推结果进 Channel
ChannelContainer::push($data['messageId'], $data);
}
}
}