当前位置: 首页 > 知识库问答 >
问题:

如何从AMQP(RabbitMQ)队列中删除消息?

宋明亮
2023-03-14

这就是事情。

我正在使用PHP AMQP从Rabbitmq读取结果队列,以便处理发送的每封电子邮件上的重要信息。完成后,我需要将该消息删除或标记为已写入,以便下次读取队列时,不会得到已处理的消息。

由于Rabbitmq服务器每小时发送超过10.000封电子邮件,每次我读取队列以处理结果发送时,脚本至少可以运行5分钟,以便处理队列中的所有消息,因此在完成后,在这5分钟内会发送数百条新消息。这使得我无法在脚本完成后清除队列,因为它会删除脚本运行期间未处理的消息位置。

这让我只有一个选择。标记或删除刚刚被我的AMQP脚本处理或读取的消息。

有办法吗?(以下是脚本)

<?php
/**
 *  result.php
 *  Script that connects to RabbitMQ, and takes the result message from
 *  the result message queue.
 */

// include the settings
 require_once('settings.php');

// try to set up a connection to the RabbitMQ server
try
{
    // construct the connection to the RabbitMQ server
    $connection = new AMQPConnection(array(
        'host'      =>  $hostname,
        'login'     =>  $username,
        'password'  =>  $password,
        'vhost'     =>  $vhost
    ));

    // connect to the RabbitMQ server
    $connection->connect();
}
catch (AMQPException $exception)
{
    echo "Could not establish a connection to the RabbitMQ server.\n";
}

// try to create the channel
try
{
    // open the channel
    $channel = new AMQPChannel($connection);
}
catch (AMQPConnectionException $exception)
{
    echo "Connection to the broker was lost (creating channel).\n";
}

// try to create the queue
try
{
    // create the queue and bind the exchange
    $queue   = new AMQPQueue($channel);
    $queue->setName($resultbox);
    $queue->setFlags(AMQP_DURABLE);
    $queue->bind('exchange1', 'key1');
    $queue->declare();
}
catch (AMQPQueueException $exception)
{
    echo "Channel is not connected to a broker (creating queue).\n";
}
catch (AMQPConnectionException $exception)
{
    echo "Connection to the broker was lost. (creating queue)/\n";
}

// Get the message from the queue. 
while ($envelope = $queue->get()) {
    //Function that processes the message
    process_message($envelope->getBody());
}
    $queue->purge();

// done, close the connection to RabbitMQ
$connection->disconnect();
?>

共有1个答案

山越
2023-03-14

确认消息$queu-

UPD:

根据您的代码:

while ($envelope = $queue->get()) {
    //Function that processes the message
    process_message($envelope->getBody());
    $queue->ack($envelope->getDeliveryTag());
}
while ($envelope = $queue->get(AMQP_AUTOACK)) {
    //Function that processes the message
    process_message($envelope->getBody());
}

附笔。:

查看AMQPQueue::consume文档,看起来它更适合这里。

$queue->consume(function ($envelope, $queue) {
        process_message($envelope->getBody());
        $queue->ack($envelope->getDeliveryTag());
});
$queue->consume(function ($envelope, $queue) {
        process_message($envelope->getBody());
        $queue->ack($envelope->getDeliveryTag());
}, AMQP_AUTOACK);

结论:我建议使用#3溶液,但这取决于你。

 类似资料:
  • 我安装了Rabbitmqadmin,并能够列出所有的交换和队列。如何使用Rabbitmqadmin或Rabbitmqctl删除所有队列。

  • 我最近将一台服务器从ActiveMQ从5.8升级到了最新版本(5.11.1)。从那以后,我偶尔注意到,消息将在特定队列中累积,而不会被删除。 我们的架构有一个生产者,一个消费者。我可以看到消费者仍然保持联系,但制作人的信息越来越多。我的解决方案是通过web控制台删除队列。之后,我立即看到消费者重新连接,消息再次开始处理。 如果相关,在这种情况下,生产者正在运行NMS。NET和消费者在Java 1.

  • 场景:微服务从RabbitMQ队列中拾取消息,将其转换为对象,然后微服务对外部服务进行REST调用。 它将处理成千上万条这样的消息,如果我们知道外部Rest服务关闭,有没有办法告诉我的消费者不要从队列中拾取消息? 我知道我可以在收到一条单独的消息后重试,但如果我知道它被拒绝了,我甚至不想再收到它。我不想在DLQ中处理成千上万的消息。 而且它感觉像一个断路器设计模式,但我找不到任何具体的例子来说明如

  • 在队列选项卡的rabbitMQ web界面上,我看到了“概述”面板,我在其中找到了以下内容: 排队消息: 准备好了 未确认 总数 我猜“总数”是多少。但什么是“准备就绪”和“未确认”?“准备好了”——传递给消费者的信息?“未确认”-? 消息费率: 发表 交付 重新交付 承认 这些信息是什么?尤其是“重新交付”和“确认”?这是什么意思?

  • 如何从weblogic JMS队列中清除/删除消息 我的是一个集群环境,在服务中使用。 获取以下异常。

  • ActiveMQ/JMS有一个内置机制,用于确保在使用竞争消费者模式时,共享公共报头(即JMSXGroupID报头)的消息始终由队列的同一消费者使用。队列的使用者完全不知道实际的头值,因为具有公共头的消息的保证是在服务器端而不是在使用者端执行的。有关此工作方式的更多详细信息,请参见http://activemq.apache.org/message-groups.html。 用AMQP或者用Rab