当前位置: 首页 > 知识库问答 >
问题:

php rabbitmq使用者重新连接

邴越彬
2023-03-14

我有一个PHP应用程序,使用RabbitMQ。为了实现冗余,我创建了一对RabbitMQ服务器,并将它们连接到一个集群中。我也有一个VyOS故障转移群集运行HAProxy负载平衡连接,并在故障转移的情况下提供重新路由。

昨天,我们的VyOS集群决定需要故障转移(可能是短暂的网络中断)。在一个VyOS上停止了HA代理,虚拟IP被移动,并在另一个节点上重新启动HA代理。

之后,我查看了Rabbit中的队列,发现每个队列都没有消费者。我检查了运行PHP的机器,消费者仍然在运行PHP。我离开他们一段时间,看看他们是否能重新连接,但他们没有。我不得不杀死PHP脚本并重新启动它们,它们重新连接并立即开始使用。

我认为RabbitMQ和HAproxy正在按预期工作。。。现在我需要PHP使用者支持故障转移事件。。。换句话说,它需要检测断开连接并自动重新连接,而不仅仅是挂起。

这是我的RabbitMQ类。提前感谢任何帮助!

<?php
while(true)
{
    try{getMessages("transcode2");}
    catch(Exception $e){echo($e->getMessage()."\n");}
    sleep(1);
}
require_once("../api/db.php");
require_once("../vendor/autoload.php");
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
function sendMessage($msg,$prio)
{
    global $channel;
    $msg=json_encode($msg);
    $queue="transcode2";
    $channel->queue_declare($queue,true,false,false,false);
    $channel->basic_publish(new AMQPMessage($msg,array('priority' => $prio)),'',$queue);
}
function getMessages($queue)
{
    global $connection,$channel;
    $connection=new AMQPStreamConnection(RABBITMQ_SERVER,RABBITMQ_PORT,RABBITMQ_USERNAME,RABBITMQ_PASSWORD);
    $channel=$connection->channel();
    $channel->queue_declare($queue,true,false,false,false);
    $callback=function($msg)
    {
        if(handleMessage(json_decode($msg->body,true)))
        {
            $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
        }
        else
        {
            $msg->delivery_info['channel']->basic_nack($msg->delivery_info['delivery_tag'],false,true);
        }
    };
    $channel->basic_qos(null,1,null);
    $channel->basic_consume($queue,'',false,false,false,false,$callback);
    while(count($channel->callbacks))
    {
        try{$channel->wait();}
        catch(Exception $e)
        {
            break;
        }
    }
    $channel->close();
    $connection->close();
}
?>

共有2个答案

东郭腾
2023-03-14

零超时无法正常工作,正如您所说的,代理可能会关闭连接,而PHP使用者没有发现它。

解决方案是不使用零接收超时。确保连接超时大于接收超时。

下面是一个基于AMQP互操作的示例:

安装AMQP Interop兼容传输,例如:

composer require enqueue/amqp-bunny

代码执行与显式设置超时相同的操作:

<?php
use Enqueue\AmqpBunny\AmqpConnectionFactory;
use Interop\Amqp\AmqpConsumer;
use Interop\Amqp\AmqpMessage;
use Interop\Amqp\AmqpQueue;

$context = (new AmqpConnectionFactory(sprintf(
    'amqp://%s:%s@%s:%s/%2f?connection_timeout=600', // 10 min
    RABBITMQ_USERNAME,RABBITMQ_PASSWORD, RABBITMQ_SERVER, RABBITMQ_PORT
)))->createContext();

$context->setQos(null,1,null);

//sendMessage

$queue = $context->createQueue("transcode2");
$queue->addFlag(AmqpQueue::FLAG_PASSIVE);
$context->declareQueue($queue);

$message = $context->createMessage(json_encode($msg));
$message->setPriority($prio);

$producer = $context->createProducer();
$producer->send($queue, $message);

// getMessages

$consumer = $context->createConsumer($queue);
$context->subscribe($consumer, function(AmqpMessage $message, AmqpConsumer $consumer) {
    if(handleMessage(json_decode($message->getBody(), true))) {
        $consumer->acknowledge($message);
    } else {
        $consumer->reject($message);
    }

    return true;
});

$receiveTimeout = 5000; // 5 seconds, should be lesser than connection_timeout which is 600 seconds now.

$context->consume($receiveTimeout);
师成弘
2023-03-14

如果将timeout参数用于$channel,则可能会起作用-

 类似资料:
  • 我正在与eclipse泛美卫生组织和蚊子工作。我正在发送QOS-1级别的消息。我已经改变了mosquitto的配置,就像在这个问题中回答的那样。我正在使用mqtt镜头来测试。在用户断开连接后,Mosquitto不发送消息。

  • 但是,即使我将节点显式设置到属性中,它们也不会尝试重新连接到有效的kafka节点。 如何使我的使用者在他们连接的kafka节点失败后重新连接到有效的kafka节点?

  • 我在项目中使用solace作为JMS提供者。我使用spring CachingConnectionFactory检索连接。在这个连接上,我创建了一个新会话。我在那个会话中创建了一个消费者的线程。 我正在做一些故障转移测试。当我将服务器从网络连接上拔下时,它会失败。当我再次连接服务器时,仍会收到相同的异常: 更重要的是,CachingConnectionFactory默认将reConnectOnEx

  • 问题内容: 我有一个问题,如果mySQL Server在“睡眠时间” 500秒后终止了会话,则下一个请求不会成功。如果mySQL Server没有关闭睡眠连接,则可能在700秒后发生相同的问题。 我能做什么?遵循我的persistence.xml的属性 如果重要的话,transactiontype为RESOURCE_LOCAL。 问题答案: 您如何配置连接池?如果它是服务器数据源,则应在服务器中设

  • 重新建立mqtt连接,前提是必须已经通过Iot_id,Iot_pwd建立过一次mqtt连接。 请求方式: "|4|1|5|\r" 返回值: "|4|1|1|1|\r" mqtt连接状态:连接成功 "|4|1|1|2|reason|\r" mqtt连接状态:连接失败,字符串reason表示失败的原因 Arduino样例: softSerial.print("|4|1|5|\r");

  • 重新连接上一次连接的wifi。 请求方式: "|2|3|\r" 返回值: "|2|1|\r" wifi连接状态:wifi断开连接 "|2|2|\r" wifi连接状态:正在连接wifi "|2|3|ip|\r" wifi连接状态:wifi连接成功,返回OBLOQ的ip地址 "|2|4|\r" wifi连接状态:wifi连接失败,检查SSID和PWD是否输入正确 Arduino样例: softSer