composer安装地址:https://packagist.org/packages/php-amqplib/php-amqplib
搭建和配置rabbitmq:https://blog.csdn.net/why444216978/article/details/84570801
安装PHP-AMPQ:https://blog.csdn.net/why444216978/article/details/84571127
PHP-AMPQ扩展使用,方便理解rabbitmq和三种交换机:https://blog.csdn.net/why444216978/article/details/102534283
通过shell脚本监控消费者举例:https://blog.csdn.net/why444216978/article/details/84987048
也可参考使用supervisord:https://blog.csdn.net/why444216978/article/details/102153705
Rabbitmq.php
<?php
require_once './vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class Rabbitmq
{
protected $channel;
public function __construct($name = 'default')
{
//$this->_logSrv = &load_class('Log', 'core');
//$this->_conf = C('rabbitmq');
$this->_name = $name;
}
public function connect()
{
$connection = new AMQPStreamConnection('localhost', 5672, 'why', 'why', 'why', false, 'AMQPLAIN', null, 'en_US', 3.0, 3.0, null, false, 0);
$this->channel = $connection->channel();
return true;
}
/**
* @param $messageBody 消息内容
* @param $exchange 交换机名
* @param $routeKey 路由键
* @param array $head
* @return bool
*/
public function publish($messageBody, $exchange, $routeKey, $head = [])
{
if (!$this->connect()) {
return false;
}
$head = array_merge(array('content_type' => 'text/plain', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT), $head);
$message = new AMQPMessage($messageBody, $head);
$res = $this->channel->basic_publish($message, $exchange, $routeKey);
return $res;
}
/**
* @param $queue 队列名
* @param $consumerTag 路由键(这里自动遵循交换机类型匹配规则,默认已经创建好)
* @param $call_back 回调处理方法
* @return bool
*/
public function consumer($queue, $consumerTag, $call_back)
{
if (!isset($call_back) || empty($call_back)) {
return false;
}
if (!$this->connect()) {
return false;
}
$this->channel->basic_qos(null, 10, null);
$this->channel->basic_consume($queue, $consumerTag, false, false, false, false, $call_back);
while (count($this->channel->callbacks)) {
$this->channel->wait();
}
}
static public function consumer_ack($message)
{
echo "\n--------\n";
echo $message->body;
echo "\n--------\n";
$message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);
// Send a message with the string "quit" to cancel the consumer.
if ($message->body === 'quit') {
$message->delivery_info['channel']->basic_cancel($message->delivery_info['consumer_tag']);
}
}
}
producer.php
<?php
require_once './Rabbitmq.php';
$rabbitmq = new Rabbitmq();
$res = $rabbitmq->connect();
if($res){
echo 'connect success!' . "\n";
$msg = ['key' => 'name', 'value' => 'weihaoyu'];
$msg = json_encode($msg);
$rabbitmq->publish($msg, 'why_exchange', 'why_route');
echo 'send success:' . $msg . "\n";
}else{
echo 'connect error!' . "\n";
}
consumer.php
<?php
require_once './Rabbitmq.php';
$callBack = function ($message){
//print_r($message);
$body = $message->body;
if(!empty($body)){
echo "\n" . 'consume success:' . $body . "\n";
$message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);
}
};
$rabbitmq = new Rabbitmq();
$res = $rabbitmq->connect();
$rabbitmq->consumer('why_queue', 'why_route', $callBack);
生产:
whydeMacBook-Pro:php why$ php producer.php
connect success!
send success:{"key":"name","value":"weihaoyu"}
消费:
consume success:{"key":"name","value":"weihaoyu"}