RabbitMQ(AMQP)

优质
小牛编辑
133浏览
2023-12-01

介绍

支持在 imi 框架中使用 支持 AMQP 协议的消息队列,如:RabbitMQ

支持消息发布和消费

Github: https://github.com/imiphp/imi-amqp

Composer

本项目可以使用composer安装,遵循psr-4自动加载规则,在你的 composer.json 中加入下面的内容:

{
    "require": {
        "imiphp/imi-amqp": "^1.0.0"
    }
}

然后执行 composer update 安装。

使用说明

可以参考 example 目录示例,包括完整的消息发布和消费功能。

在项目 config/config.php 中配置:

[
    'components'    =>  [
        // 引入组件
        'AMQP'   =>  'Imi\AMQP',
    ],
]

连接池配置:

[
    'pools'    =>    [
        'rabbit'    =>  [
            'sync'    =>    [
                'pool'    =>    [
                    'class'        =>    \Imi\AMQP\Pool\AMQPSyncPool::class,
                    'config'    =>    [
                        'maxResources'    =>    10,
                        'minResources'    =>    0,
                    ],
                ],
                'resource'    =>    [
                    'host'      => '127.0.0.1',
                    'port'      => 5672,
                    'user'      => 'guest',
                    'password'  => 'guest',
                ]
            ],
            'async'    =>    [
                'pool'    =>    [
                    'class'        =>    \Imi\AMQP\Pool\AMQPCoroutinePool::class,
                    'config'    =>    [
                        'maxResources'    =>    10,
                        'minResources'    =>    1,
                    ],
                ],
                'resource'    =>    [
                    'host'      => '127.0.0.1',
                    'port'      => 5672,
                    'user'      => 'guest',
                    'password'  => 'guest',
                ]
            ],
        ],
    ]
]

默认连接池:

[
    'beans' =>  [
        'AMQP'  =>  [
            'defaultPoolName'   =>  'rabbit',
        ],
    ],
]

连接配置项

属性名称说明
host主机
port端口
user用户名
vhostvhost,默认/
insistinsist
loginMethod默认AMQPLAIN
loginResponseloginResponse
locale默认en_US
connectionTimeout连接超时
readWriteTimeout读写超时
keepalivekeepalive,默认false
heartbeat心跳时间,默认0
channelRpcTimeout频道 RPC 超时时间,默认0.0
sslProtocolssl 协议,默认null

消息定义

继承 Imi\AMQP\Message 类,可在构造方法中对属性修改。

根据需要可以覆盖实现setBodyDatagetBodyData方法,实现自定义的消息结构。

<?php
namespace ImiApp\AMQP\Test2;

use Imi\AMQP\Message;

class TestMessage2 extends Message
{
    /**
     * 用户ID
     *
     * @var int
     */
    private $memberId;

    /**
     * 内容
     *
     * @var string
     */
    private $content;

    public function __construct()
    {
        parent::__construct();
        $this->routingKey = 'imi-2';
        $this->format = \Imi\Util\Format\Json::class;
    }

    /**
     * 设置主体数据
     *
     * @param mixed $data
     * @return self
     */
    public function setBodyData($data)
    {
        foreach($data as $k => $v)
        {
            $this->$k = $v;
        }
    }

    /**
     * 获取主体数据
     *
     * @return mixed
     */
    public function getBodyData()
    {
        return [
            'memberId'  =>  $this->memberId,
            'content'   =>  $this->content,
        ];
    }

    /**
     * Get 用户ID
     *
     * @return int
     */ 
    public function getMemberId()
    {
        return $this->memberId;
    }

    /**
     * Set 用户ID
     *
     * @param int $memberId  用户ID
     *
     * @return self
     */ 
    public function setMemberId(int $memberId)
    {
        $this->memberId = $memberId;

        return $this;
    }

    /**
     * Get 内容
     *
     * @return string
     */ 
    public function getContent()
    {
        return $this->content;
    }

    /**
     * Set 内容
     *
     * @param string $content  内容
     *
     * @return self
     */ 
    public function setContent(string $content)
    {
        $this->content = $content;

        return $this;
    }

}

属性列表:

名称说明默认值
bodyData消息主体内容,非字符串null
properties属性['content_type' => 'text/plain', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,]
routingKey路由键空字符串
format如果设置了,发布的消息是编码后的bodyData,同理读取时也会解码。实现了Imi\Util\Format\IFormat的格式化类。支持JsonPhpSerializenull
mandatorymandatory标志位false
immediateimmediate标志位false
ticketticketnull

发布者定义

必选注解:@Publisher

可选注解:@Queue@Exchange@Connection

不配置 @Connection 注解,可以从连接池中获取连接

<?php
namespace ImiApp\AMQP\Test;

use Imi\Bean\Annotation\Bean;
use Imi\AMQP\Annotation\Queue;
use Imi\AMQP\Base\BasePublisher;
use Imi\AMQP\Annotation\Consumer;
use Imi\AMQP\Annotation\Exchange;
use Imi\AMQP\Annotation\Publisher;
use Imi\AMQP\Annotation\Connection;

/**
 * @Bean("TestPublisher")
 * @Connection(host="127.0.0.1", port=5672, user="guest", password="guest")
 * @Publisher(tag="tag-imi", queue="queue-imi-1", exchange="exchange-imi", routingKey="imi-1")
 * @Queue(name="queue-imi-1", routingKey="imi-1")
 * @Exchange(name="exchange-imi")
 */
class TestPublisher extends BasePublisher
{

}

该类所在命名空间必须能被扫描到,要在beanScan中配置

发布消息

// 实例化构建消息
$message = new \ImiApp\AMQP\Test2\TestMessage2;
$message->setMemberId(1);
$message->setContent('imi niubi');

// 发布消息
/** @var \ImiApp\AMQP\Test\TestPublisher $testPublisher */
$testPublisher = App::getBean('TestPublisher');
$testPublisher->publish($message);

消费者定义

必选注解:@Consumer

可选注解:@Queue@Exchange@Connection

不配置 @Connection 注解,可以从连接池中获取连接

<?php
namespace ImiApp\AMQP\Test;

use Imi\Redis\Redis;
use Imi\Bean\Annotation\Bean;
use Imi\AMQP\Annotation\Queue;
use Imi\AMQP\Base\BaseConsumer;
use Imi\AMQP\Contract\IMessage;
use Imi\AMQP\Annotation\Consumer;
use Imi\AMQP\Annotation\Exchange;
use Imi\AMQP\Enum\ConsumerResult;
use Imi\AMQP\Annotation\Connection;

/**
 * 启动一个新连接消费
 * 
 * @Bean("TestConsumer")
 * @Connection(host="127.0.0.1", port=5672, user="guest", password="guest")
 * @Consumer(tag="tag-imi", queue="queue-imi-1", message=\ImiApp\AMQP\Test\TestMessage::class)
 */
class TestConsumer extends BaseConsumer
{
    /**
     * 消费任务
     *
     * @param \ImiApp\AMQP\Test\TestMessage $message
     * @return void
     */
    protected function consume(IMessage $message)
    {
        var_dump(__CLASS__, $message->getBody(), get_class($message));
        Redis::set('imi-amqp:consume:1:' . $message->getMemberId(), $message->getBody());
        return ConsumerResult::ACK;
    }

}

消费消息

随服务启动的消费进程

只会启动一个进程,适合量少的场景。适合IO密集型场景。

首先定义进程:

<?php
namespace ImiApp\Process;

use Imi\Process\BaseProcess;
use Imi\Aop\Annotation\Inject;
use Imi\Process\Annotation\Process;

/**
 * @Process(name="TestProcess")
 */
class TestProcess extends BaseProcess
{
    /**
     * @Inject("TestConsumer")
     *
     * @var \ImiApp\AMQP\Test\TestConsumer
     */
    protected $testConsumer;

    /**
     * @Inject("TestConsumer2")
     *
     * @var \ImiApp\AMQP\Test2\TestConsumer2
     */
    protected $testConsumer2;

    public function run(\Swoole\Process $process)
    {
        // 启动消费者
        go(function(){
            do {
                $this->testConsumer->run();
            } while(true);
        });
        go(function(){
            do {
                $this->testConsumer2->run();
            } while(true);
        });
    }

}

然后在项目配置@app.beans中配置消费进程

[
    'AutoRunProcessManager' =>  [
        'processes' =>  [
            'TestProcess'
        ],
    ],
]

启动进程池消费

适合计算密集型场景、消费量非常多的场景。

进程池写法参考:https://doc.imiphp.com/components/process-pool/swoole.html

启动消费者写法参考上面的即可。

注解说明

@Publisher

发布者注解

属性名称说明
queue队列名称
exchange交换机名称
routingKey路由键

@Consumer

消费者注解

属性名称说明
tag消费者标签
queue队列名称
exchange交换机名称
routingKey路由键
message消息类名,默认:Imi\AMQP\Message
mandatorymandatory标志位
immediateimmediate标志位
ticketticket

@Queue

队列注解

属性名称说明
name队列名称
routingKey路由键
passive被动模式,默认false
durable消息队列持久化,默认true
exclusive独占,默认false
autoDelete自动删除,默认false
nowait是否非阻塞,默认false
arguments参数
ticketticket

@Exchange

交换机注解

属性名称说明
name交换机名称
type类型可选:directfanouttopicheaders
passive被动模式,默认false
durable消息队列持久化,默认true
autoDelete自动删除,默认false
internal设置是否为rabbitmq内部使用, true表示是内部使用, false表示不是内部使用
nowait是否非阻塞,默认false
arguments参数
ticketticket

@Connection

连接注解

属性名称说明
poolName不为 null 时,无视其他属性,直接用该连接池配置。默认为null,如果hostportuserpassword都未设置,则获取默认的连接池。
host主机
port端口
user用户名
vhostvhost,默认/
insistinsist
loginMethod默认AMQPLAIN
loginResponseloginResponse
locale默认en_US
connectionTimeout连接超时
readWriteTimeout读写超时
keepalivekeepalive,默认false
heartbeat心跳时间,默认0
channelRpcTimeout频道 RPC 超时时间,默认0.0
sslProtocolssl 协议,默认null