参考文章:swoft使用rabbitmq消息队列
添加以下内容
"require": {
"swoft/amqp": "^2.0"
},
{
"repositories": {
"swoft-amqp": {
"type": "git",
"url": "https://github.com/swoft-cloud/swoft-amqp.git"
}
}
}
'amqp' => [
'class' => Swoft\Amqp\Client::class,
'host' => 'rabbitmq',
'port' => 5672,
'user' => 'guest',
'password' => 'guest',
'vhost' => '/',
// 默认交换机、队列、路由名称,非强制,建议配置。当下面的channels没有具体设置时,会使用此配置
'queue' => 'test_queue',
'exchange' => 'exchange_test',
'route' => 'example-test-routing-key',
// 交换机类型。
'type' => AMQPExchangeType::DIRECT,
'channels' => [
'channel_1'=>[ //对应的key即为channel_id
'exchange' => 'exchange_test_02',
'queue' => 'subway',
'route' => 'route_subway',
],
],
],
'amqp.pool' => [
'class' => \Swoft\Amqp\Pool::class,
'client' => bean('amqp'),
],
不同进程消费不同队列的消息,修改channel_1
为对应的信道名称即可
CLog::info('worker-' . $workerId);
CLog::info('run method: ' . __METHOD__);
// 实例化不同信道
$channel = Amqp::channel('channel_1');
$channel->listen(function ($message) {
//$message:数据结构(json_encode)之后
//{"body":"hey!-----9","body_size":10,"is_truncated":false,"content_encoding":null,"delivery_info":{"channel":{"callbacks":{"amq.ctag-epGZgfHej3YrjZk2FBvp0A":{}}},"delivery_tag":99,"redelivered":false,"exchange":"exchange_test","routing_key":"example-test-routing-key","consumer_tag":"amq.ctag-epGZgfHej3YrjZk2FBvp0A"}}
CLog::info('message:' . json_encode($message));
// 模拟业务处理
sleep(random_int(10,20));
});
'type' => AMQPExchangeType::DIRECT,
'channels' => [
'channel_1'=>[ //对应的key即为channel_id
'exchange' => 'exchange_01',
'queue' => 'que_01',
'route' => 'route_ex_01',
]
],
// 给不同路由发消息,需要声明多个信道
$channel_1 = Amqp::channel('channel_1');
CLog::info('run method: '.__METHOD__);
for($i=0; $i<10; $i++)
{
$channel_1->push('hey!-----'.$i.date('Y-m-d h:i:s'),[],'route_ex_01' );
}
'type' => AMQPExchangeType::DIRECT,
'channels' => [
'channel_1'=>[ //对应的key即为channel_id
'exchange' => 'exchange_01',
'queue' => 'que_01',
'route' => 'route_ex_01',
],
'channel_2'=>[ //对应的key即为channel_id
'exchange' => 'exchange_01',
'queue' => 'que_02',
'route' => 'route_ex_01',
],
],
// 给不同路由发消息,需要声明多个信道
$channel_1 = Amqp::channel('channel_1');
$channel_2 = Amqp::channel('channel_2');
CLog::info('run method: '.__METHOD__);
for($i=0; $i<10; $i++)
{
// 这里是$channel_1还是$channel_2,都是一样的效果
$channel_1->push('hey!-----'.$i.date('Y-m-d h:i:s'),[],'route_ex_01' );
}
发送结果 | que_01 | que_02 |
---|---|---|
数量 | 10 | 10 |
'type' => AMQPExchangeType::DIRECT,
'channels' => [
'channel_1'=>[ //对应的key即为channel_id
'exchange' => 'exchange_01',
'route' => 'route_ex_01',
'queue' => 'que_01',
],
'channel_2'=>[ //对应的key即为channel_id
'exchange' => 'exchange_01',
'route' => 'route_ex_02',
'queue' => 'que_02',
],
],
// 给不同路由发消息,需要声明多个信道
$channel_1 = Amqp::channel('channel_1');
$channel_2 = Amqp::channel('channel_2');
CLog::info('run method: '.__METHOD__);
for($i=0; $i<10; $i++)
{
// 这里是$channel_1还是$channel_2,都是一样的效果
$channel_1->push('hey!-----'.$i.date('Y-m-d h:i:s'),[],'route_ex_01' );
$channel_2->push('hey!-----'.$i.date('Y-m-d h:i:s'),[],'route_ex_02' );
}
下面这样也是可以获得相同的发送结果,Amqp::channel()
是为了载入信道配置的
$channel_1 = Amqp::channel('channel_1');
// 如果注释掉下一行,则不会发送到que_02
$channel_1 = Amqp::channel('channel_2');
CLog::info('run method: '.__METHOD__);
for($i=0; $i<10; $i++)
{
// 这里是$channel_1还是$channel_2,都是一样的效果
$channel_1->push('hey!-----'.$i.date('Y-m-d h:i:s'),[],'route_ex_01' );
$channel_1->push('hey!-----'.$i.date('Y-m-d h:i:s'),[],'route_ex_02' );
}
发送结果 | que_01 | que_02 |
---|---|---|
数量 | 10 | 10 |
如果
$channel_2
在push
时,填入route_ex_01
,则发送结果如下
发送结果 | que_01 | que_02 |
---|---|---|
数量 | 20 | 0 |
'type' => AMQPExchangeType::DIRECT,
'channels' => [
'channel_1'=>[ //对应的key即为channel_id
'exchange' => 'exchange_01',
'route' => 'route_ex_01',
'queue' => 'que_01',
],
'channel_2'=>[ //对应的key即为channel_id
'exchange' => 'exchange_02',
'route' => 'route_ex_01',
'queue' => 'que_01',
],
],
// 给不同路由发消息,需要声明多个信道
$channel_1 = Amqp::channel('channel_1');
$channel_2 = Amqp::channel('channel_2');
CLog::info('run method: '.__METHOD__);
for($i=0; $i<10; $i++)
{
$channel_1->push('hey!-----'.$i.date('Y-m-d h:i:s'),[],'route_ex_01' );
$channel_2->push('hey!-----'.$i.date('Y-m-d h:i:s'),[],'route_ex_01' );
}
|发送结果|ex_01|ex_02|que_01
|–|–|–|
|数量|0|20|20|
在交换机中查看消息,是按照发送顺序排列;但是在消费消息队列时,很大可能出现不按发送顺序读取队列消息的情况
虽然建立了两个信道,但是却被最后的信道覆盖了exchange名,消息全部从最后一个交换机通过
'type' => AMQPExchangeType::DIRECT,
'channels' => [
'channel_1'=>[ //对应的key即为channel_id
'exchange' => 'exchange_01',
'route' => 'route_ex_01',
'queue' => 'que_01',
],
'channel_2'=>[ //对应的key即为channel_id
'exchange' => 'exchange_02',
'route' => 'route_ex_02',
'queue' => 'que_01',
],
],
// 给不同路由发消息,需要声明多个信道
$channel_1 = Amqp::channel('channel_1');
$channel_2 = Amqp::channel('channel_2');
CLog::info('run method: '.__METHOD__);
for($i=0; $i<10; $i++)
{
$channel_1->push('hey!-----'.$i.date('Y-m-d h:i:s'),[],'route_ex_01' );
$channel_2->push('hey!-----'.$i.date('Y-m-d h:i:s'),[],'route_ex_02' );
}
发送结果 | ex_01 | ex_02 | que_01 |
---|---|---|---|
数量 | 0 | 20(In)/10(Out) | 10 |
虽然建立了不同的交换机,但是从结果看,所有信息都是通过ex_02发送,因此是20(In)。在转发到队列时,因为找不到
route_ex_01
,所以只发送出去10(Out)
如果是通过不同的生产者就可以正常推送到同一个queue中,适合N-1-1和N-N-1,如下所示
第一种,建立不同的生产者方法
// 调用
DirectDao::product1();
public static function product1()
{
$channel_1 = Amqp::channel('channel_2');
CLog::info('run method: '.__METHOD__);
for($i=0; $i<10; $i++)
{
$channel_1->push('from:ex_1|hey!-----'.$i.date('Y-m-d h:i:s'),[],'route_ex_02' );
sleep(2);
}
$name = 'steve';
return context()->getResponse()->withContent('Hello' . ($name === '' ? '' : ", {$name}"));
}
第二种,通过传入不同的信道信息、路由,构建生产者信道
DirectDao::product('channel_1', 'route_ex_01');
public static function product($channel, $route_key)
{
$channel_1 = Amqp::channel($channel);
CLog::info('run method: '.__METHOD__);
for($i=0; $i<10; $i++)
{
$channel_1->push('from:'.$channel.'|hey!-----'.$i.date('Y-m-d h:i:s'),[],$route_key );
sleep(2);
}
$name = 'steve';
return context()->getResponse()->withContent('Hello' . ($name === '' ? '' : ", {$name}"));
}