一、生产消息
<?php
namespace app\controllers;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
use yii\web\Controller;
class QueueController extends Controller
{
private $connect = '';
public function init()
{
parent::init();
$config = [
'host' => '192.168.56.102',
'port' => 5672,
'user' => 'admin',
'pwd' => 123456,
'vhost' => '/'
];
$this->connect = new AMQPStreamConnection($config['host'], $config['port'], $config['user'], $config['pwd'], $config['vhost']);
return $this->connect;
}
/**
* 简单队列
* 无需申明交换机(默认交换机)
* 发送消息 routingKey需要与队列名称一致
*/
public function actionSimple()
{
$conn = $this->connect;
$queueName = 'queue_simple';
$channel = $conn->channel();
$channel->queue_declare($queueName, false, true, false, false);
$msgBody = json_encode(['code' => 200, 'data' => ['name' => '里斯', 'age' => 18]]);
$msg = new AMQPMessage($msgBody, ['content_type' => 'text/plain', 'delivery_mode' => 2]);
$channel->basic_publish($msg, '', 'queue_simple');
$channel->close();
}
/**
* fanout 广播消息
* 队列绑定交换机
* 消息发送到所有绑定的队列
* 无需申明routingKey 申明了也无效
*/
public function actionFanout()
{
$conn = $this->connect;
$conn->channel();
$exchangeName = 'exchange_fanout';
$queueName1 = 'queue_fanout1';
$queueName2 = 'queue_fanout2';
$channel = $conn->channel();
$channel->exchange_declare($exchangeName, 'fanout', false, true, false, false);
$channel->queue_declare($queueName1, false, true, false, false);
$channel->queue_declare($queueName2, false, true, false, false);
$channel->queue_bind($queueName1, $exchangeName, '');
$channel->queue_bind($queueName2, $exchangeName, '');
$msgBody = 'I am fanout message';
$msg = new AMQPMessage($msgBody, ['delivery_mode' => 2]);
$channel->basic_publish($msg, $exchangeName, '');
}
/**
* 直连模式
* 队列需绑定交换机和路由
*/
public function actionDirect()
{
$conn = $this->connect;
$channel = $conn->channel();
$exchangeName = 'exchange_direct';
$rk = 'success';
$channel->exchange_declare($exchangeName, 'direct', false, true, false, false);
$msgBody = 'This is success message';
$msg = new AMQPMessage($msgBody, ['delivery_mode' => 2]);
$channel->basic_publish($msg, $exchangeName, $rk);
}
/**
* 主题模式
* [*表示一个单词 #表示零个或多个]
*/
public function actionTopic()
{
$conn = $this->connect;
$channel = $conn->channel();
$exchangeName = 'exchange_topic';
$rk = 'mail.success';
$channel->exchange_declare($exchangeName, 'topic', false, true, false, false);
$msgBody = 'This is send.mail.success message';
$msg = new AMQPMessage($msgBody, ['delivery_mode' => 2]);
$channel->basic_publish($msg, $exchangeName, $rk);
}
/**
* 发送消息确认
* confirm模式 交换机向队列发送消息
*/
public function actionConfirm()
{
$exchangeName = 'exchange_confirm';
$queueName = 'queue_confirm';
$routingKey = 'confirm';
$conn = $this->connect;
$channel = $conn->channel();
$channel->exchange_declare($exchangeName, 'direct', false, true, false);
$channel->queue_declare($queueName, false, true, false, false);
$channel->queue_bind($queueName, $exchangeName, $routingKey);
//发送成功回调
$channel->set_ack_handler(function (AMQPMessage $message) {
echo $message->body . ' send succeed';
});
//发送失败回调
$channel->set_nack_handler(function (AMQPMessage $message) {
echo $message->body . ' send failed';
});
//开启发送监听
$channel->confirm_select();
$msg = new AMQPMessage('测试消息', ['delivery_mode' => 2]);
$channel->basic_publish($msg, $exchangeName, $routingKey);
//阻塞等待消息确认
$channel->wait_for_pending_acks();
}
/**
* 死信队列 延迟队列
* 当队列和消息同时设置过期时间 以最短时间为准
* 消费死信队列消息
*/
public function actionDead()
{
$exchangeName = 'exchange_ttl';
$queueName = 'queue_ttl';
$routingKey = 'ttl';
$deadExchangeName = 'exchange_dead';
$deadQueueName = 'queue_dead';
$deadRoutingKey = 'dead_key';
$conn = $this->connect;
$channel = $conn->channel();
$channel->exchange_declare($exchangeName, 'direct', false, true, false);
$channel->exchange_declare('exchange_dead', 'direct', false, true, false);
//死信队列参数
$arguments = new AMQPTable([
'x-dead-letter-exchange' => $deadExchangeName,
'x-dead-letter-routing-key' => $deadRoutingKey,
'x-message-ttl' => 10000
]);
$channel->queue_declare($queueName, false, true, false, false, false, $arguments);
$channel->queue_declare('queue_dead', false, true, false, false);
$channel->queue_bind($queueName, $exchangeName, $routingKey);
$channel->queue_bind($deadQueueName, $deadExchangeName, $deadRoutingKey);
$message = new AMQPMessage('5s后该消息将过期', ['delivery_mode' => 2, 'expiration' => 5000]);
$channel->basic_publish($message, $exchangeName, $routingKey);
}
public function __destruct()
{
$conn = $this->connect;
$conn->close();
}
}
单单设置队列的ttl,或者单单设置相同的消息过期时间,死信队列是能正常工作的。但是设置不同的消息过期时间,就可能无法正常使用死信队列了。
当MQ检查队列中的第一个消息时,发现其并未过期,则不会继续检查之后的消息了。即使之后的消息过期了,也会因为没在队列头部而无法流转到其他队列,这是MQ队列的特性决定的。你不能去消费队列中间的消息,队列必须先进先出。
对于设置队列TTL属性的方法,一旦消息过期,就会从队列中抹去,而设置消息头部属性,即使消息过期,也不会马上从队列中抹去,因为每条消息是否过期时在即将投递到消费者之前判定的,为什么两者得处理方法不一致?因为第一种方法里,队列中已过期的消息肯定在队列头部,RabbitMQ只要定期从队头开始扫描是否有过期消息即可,而第二种方法里,每条消息的过期时间不同,如果要删除所有过期消息,势必要扫描整个队列,所以不如等到此消息即将被消费时再判定是否过期,如果过期,再进行删除。
二 、消费消息
<?php
namespace app\commands;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use yii\console\Controller;
class ReadController extends Controller
{
protected $connect = '';
public function init()
{
parent::init();
$config = [
'host' => '192.168.56.102',
'port' => 5672,
'user' => 'admin',
'pwd' => 123456,
'vhost' => '/'
];
$this->connect = new AMQPStreamConnection($config['host'], $config['port'], $config['user'], $config['pwd'], $config['vhost']);
return $this->connect;
}
public function process_message($message)
{
$message = $message->body;
var_dump($message);
}
public function actionSimple()
{
$conn = $this->connect;
$queueName = 'queue_simple';
$channel = $conn->channel();
$channel->basic_consume($queueName, '', false, true, false, false,
function ($msg) {
$this->process_message($msg);
});
while (count($channel->callbacks)) {
$channel->wait();
}
}
public function actionFan1()
{
$conn = $this->connect;
$queueName = 'queue_fanout1';
$channel = $conn->channel();
$channel->basic_consume($queueName, '', false, true, false, false, function ($msg) {
$this->process_message($msg);
});
while (count($channel->callbacks)) {
$channel->wait();
}
}
public function actionFan2()
{
$conn = $this->connect;
$queueName = 'queue_fanout2';
$channel = $conn->channel();
$callback = function ($message) {
echo 'I am fanout2;' . PHP_EOL;
print_r($message->body);
};
$channel->basic_consume($queueName, '', false, true, false, false, $callback);
while (count($channel->callbacks)) {
$channel->wait();
}
}
public function actionDirect()
{
$conn = $this->connect;
$exchangeName = 'exchange_direct';
$queueName = 'queue_direct';
$channel = $conn->channel();
$channel->queue_declare($queueName, false, true, false, false);
$channel->queue_bind($queueName, $exchangeName, 'info');
$callback = function ($message) {
echo 'INFO:' . PHP_EOL;
print_r($message->body);
};
$channel->basic_consume($queueName, '', false, true, false, false, $callback);
while (count($channel->callbacks)) {
$channel->wait();
}
}
public function actionDirect1()
{
$conn = $this->connect;
$exchangeName = 'exchange_direct';
$queueName = 'queue_direct1';
$channel = $conn->channel();
$channel->queue_declare($queueName, false, true, false, false);
$channel->queue_bind($queueName, $exchangeName, 'error');
$channel->queue_bind($queueName, $exchangeName, 'warning');
$callback = function ($message) {
echo 'ERROR or WARNING:' . PHP_EOL;
print_r($message->body);
};
$channel->basic_consume($queueName, '', false, true, false, false, $callback);
while (count($channel->callbacks)) {
$channel->wait();
}
}
public function actionTopic1()
{
$conn = $this->connect;
$exchangeName = 'exchange_topic';
$queueName = 'queue_topic1';
$channel = $conn->channel();
$channel->queue_declare($queueName, false, true, false, false);
$channel->queue_bind($queueName, $exchangeName, '*.mail');
$callback = function ($message) {
echo $message->body . PHP_EOL;
};
$channel->basic_consume($queueName, '', false, true, false, false, $callback);
while (count($channel->callbacks)) {
$channel->wait();
}
}
public function actionTopic2()
{
$conn = $this->connect;
$exchangeName = 'exchange_topic';
$queueName = 'queue_topic2';
$channel = $conn->channel();
$channel->queue_declare($queueName, false, true, false, false);
$channel->queue_bind($queueName, $exchangeName, 'mail.#');
$callback = function ($message) {
echo $message->body . PHP_EOL;
};
$channel->basic_consume($queueName, '', false, true, false, false, $callback);
while (count($channel->callbacks)) {
$channel->wait();
}
}
/**
* 消费消息 消费成功确认
*/
public function actionAck()
{
$queueName = 'queue_confirm';
$conn = $this->connect;
$channel = $conn->channel();
$channel->queue_declare($queueName, false, true, false, false);
$callback = function ($message) {
$msg = $message->body;
echo 'message==' . $msg . PHP_EOL;
if ($msg == '200') {
//消费成功 手动应答
$message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);
} else {
/**
* 消费失败 basic_nack($delivery_tag, $multiple = false, $requeue = false)
* requeue 是否重新发送 false 否 true是
*/
$message->delivery_info['channel']->basic_nack($message->delivery_info['delivery_tag'], false, false);
}
};
//消费成功后再发送下一条消息
$channel->basic_qos(null, 1, null);
$channel->basic_consume($queueName, '', false, false, false, false, $callback);
while (count($channel->callbacks)) {
$channel->wait();
}
}
public function actionDead()
{
$conn = $this->connect;
$channel = $conn->channel();
$callback = function ($message) {
echo $message->body . PHP_EOL;
sleep(2);
$message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);
};
$channel->basic_consume('queue_dead', '', false, false, false, false, $callback);
$channel->basic_qos(null, 1, null);
while (count($channel->callbacks)) {
$channel->wait();
}
}
}
参数说明:
建立连接
$conn = new AMQPStreamConnection( $host,//RabbitMQ服务器主机IP地址 $port,//RabbitMQ服务器端口 $user,//连接RabbitMQ服务器的用户名 $password,//连接RabbitMQ服务器的用户密码 $vhost,//连接RabbitMQ服务器的vhost(服务器可以有多个vhost,虚拟主机,类似nginx的vhost) $insist = false, $login_method = 'AMQPLAIN', $login_response = null, $locale = 'en_US', $connection_timeout = 3.0,//连接超时 $read_write_timeout = 3.0,//读写超时 $context = null,//上下文 $keepalive = false, //是否开启长连接 常驻进程消费者需要 $heartbeat = 0,//心跳检测间隔 单位秒 0不检测 根据情形酌情填写 $channel_rpc_timeout = 0.0, $ssl_protocol = null );
申明交换机 $channel->exchange_declare($exhcange_name, $type, $passive, $durable, $auto_delete,$internal,$nowait,$arguments,$ticket); 参数: $exhcange_name 交换器名字 $type 交换器类型 $passive 是否检测同名队列 $durable 交换机是否开启持久化 $auto_detlete 通道关闭后是否删除队列 (1)交换器类型 枚举 [ direct: (默认)直接交换器,工作方式类似于单播,Exchange会将消息发送完全匹配ROUTING_KEY的Queue, fanout: 广播是式交换器,不管消息的ROUTING_KEY设置为什么,Exchange都会将消息转发给所有绑定的Queue, topic: 主题交换器,工作方式类似于组播,Exchange会将消息转发和ROUTING_KEY匹配模式相同的所有队列,比如,ROUTING_KEY为user.stock的Message会转发给绑定匹配模式为 * .stock,user.stock, * . * 和#.user.stock.#的队列。(* 表是匹配一个任意词组,#表示匹配0个或多个词组), headers:根据消息体的header匹配 ]
申明队列
$channel->queue_declare($queue_name, $passive, $durable, $exclusive, $auto_delete,$nowait,$arguments,$ticket); 参数: $queue_name 队列名称 $passive 是否检测同名队列 $durable 是否开启队列持久化 $exclusive 队列是否可以被其他队列访问 $auto_delete 通道关闭后是否删除队列
推送消息
$channel->basic_publish($msg, $exchange = '', $routing_key = '', $mandatory = false, $immediate = false, $ticket = null); 参数: $msg 消息内容 $exchange 交换器 $routing_key routing_key $mandatory 匹配不到队列时,是否立即丢弃消息 $immediate 队列无消费者时,是否立即丢弃消息 $ticket
消费消息
$channel->basic_consume( $queue = '', $consumer_tag = '', $no_local = false, $no_ack = false, $exclusive = false, $nowait = false, $callback = null, $ticket = null, $arguments = array() ) 参数: $queue 队列名 $consumer_tag $no_local $no_ack 是否自动ack true自动 false手动 $exclusive $nowait $callback 消息回调函数 $ticket $arguments