当前位置: 首页 > 知识库问答 >
问题:

使用者basic_cancel之后的RabbitMq未确认消息

甘骞尧
2023-03-14

好吧,我不需要详细介绍我设置的整个系统,

我遇到的问题是,当使用者取消(amqpchannel->basic_cancel)对队列的监听时,会留下一个额外的消息未被这个工作者确认。它也不会触发正常的回调来处理此消息。

  • 队列正在阻塞(使用wait)while(count($channel->回调)){$channel->wait(...)...}
  • 预取为1,这是您所能拥有的最小值
  • 使用者可以动态侦听(amqpchannel->basic_consument)
  • 使用者可以动态忘记(amqpchannel->basic_cancel)

我不会告诉给定使用者使用或取消给定队列的确切方式。但这一切都是完美的工作,开始消费或取消只是很好。但是,当我取消一个仍然有消息的队列时,它们就会忘记最后一条消息,我认为取消会删除该队列的回调,因此除了杀死使用者之外,没有办法返回该消息,这是不希望的。

我做了一些调试(只是一个示例,而不是我实际做的事情)

   Debug::dump($this->getAmqpChannel()->getMethodQueue());
   $tag = $this->_tags[$queue]; //I keep track of the consumer tag on a queue by queue basis, $queue == {queuename} below
   $this->getAmqpChannel()->basic_cancel( $tag );
   Debug::dump($this->getAmqpChannel()->getMethodQueue());

其输出大致为

  array()
  RunCommand: basic_cancel //this works fine consumer forgets queue except ->
  array(1){
    [0] => array(3){
        [0] => string(5) "60,60",
        [1] => string(114) "amq.ctag-D9om-gD_cPevzeon52zpig\0\0\0\0\0\0\0\0\0G{queuename}",  //{queuename} is the name of the queue, which is based on clients information I cant share (such as their name)
       [2] => object(PhpAmqpLib\Message\AMQPMessage)#0 (9) {
            ["DELIVERY_MODE_NON_PERSISTENT":constant] => int(1),
            ["DELIVERY_MODE_PERSISTENT":constant] => int(2),
            ["body":public] => string(1358647) "{ ... "correlation_id":32,"max_correlation_id":38}"
            ["body_size":public] => int(135864),
            ["is_truncated":public] => bool(false),
            ["content_encoding":public] => null,
            ["propertyDefinitions":protected static] => array(14){ ... }
            ["delivery_info":public] => array(0){},
            ["prop_types":protected] => array(14){ ... }
      }
    }

一旦worker死了(或者我杀死了它),消息就会被放回队列中,我可以在get Messages下的RabbitMq管理工具(插件)中取出它。就在那里,

  Properties
    correlation_id: 32:38
    delivery_mode:  2
    content_encoding:   text/plain
    content_type:   application/json

“correlation_id”:32,“max_correlation_id”:38对应于correlation_id:32:38,因为我需要跟踪消息部分。所以我知道这是同样的信息。

而且,这也不是一次性的,每次我取消一个仍然有消息在其中的队列时,它都会发生。所以它与给定的消息无关。这就像它得到了最后一个预取的消息,然后因为它被取消了,所以没有回调来运行最后一个消息,它就被困在了边缘。记住0预取是提取所有消息,1是您可以设置的最低值。

任何能帮助的都是好的。

更新

 $this->getAmqpChannel()->basic_recover(true); //basic_recover($requeue)

我试图避免使用recover,但我认为这应该是可以的,因为消费者使用的是一个单一的通道,并且是阻塞的,在最坏的情况下,它只会拒绝一个有效消息一次,尽管这不是理想的,但应该是可以接受的。

但是,在某些情况下,我会从Rabbit得到一个额外的异常,

  PRECONDITION_FAILED - unknown delivery tag {n}

如果任何人有任何关于这个附加错误的细节,那将是很好的。而且所有的队列都需要Ack,没有一个是自动的。

更新1

但在最近的审计中,我注意到每日用户只需要处理大约1000万行(大约100分钟的工作时间),而夜间用户需要处理大约1亿行(大约20小时的工作时间)。每日用户可以在工作时间之外执行夜间作业(因为它减少了白天的响应时间),因此只有大约10个小时的窗口,夜间作业只能在一个小得多、能力差得多的服务器上运行。这给我们的解决方案是,如果没有日常作业(客户机提交的作业),它们可以动态地交换到夜间作业(数据仓库)。这将保持大部分响应性,同时不会在没有作业提交时浪费资源。我们可以在搜索上横向扩展,但我们确实为我们的主服务器付出了很多,并且浪费了我们可以做的大约8个小时的工作。

我可能会填满一本小书的工作原理,但希望这能提供一些基本的想法,我在做什么。我也被一些不披露和不竞争的东西约束在我的合同中,所以我可以真正进入具体的细节。

共有1个答案

帅煌
2023-03-14

在RabbitMQ短语中,consumer表示队列中的订阅者。(有关channel、consumer和connection之间差异的详细信息,请参见本答案)。

打开确认时,将为通道打开确认。在该通道上传递的任何消息都将有一个与其相关联的传递标记。当您处理完消息时,您需要通过同一通道告诉服务器消息已被处理。取消使用者对确认已传递的消息没有影响。事实上,接收消息、取消消费者、处理消息,然后发送确认,这将是一个非常有效的用例。

因此,您有两个选择。您可以将消息保留为未确认的状态,在这种情况下,您所要做的就是关闭通道,它将重新排在队列的首位。或者,您可以确认它(NACKACK),在这种情况下,如果NACK消息将被重新排队,如果ACK消息将被丢弃。

如果我没记错的话,不指定预取计数(通过basic.qos)将导致预取为零,这意味着您必须在接收下一条消息之前对上一条消息进行确认。我在这件事上可能是错的。当然,如果您使用basic.get,则可以完全避免此问题,并且对性能的影响很小。

 类似资料:
  • 问题内容: 我的Java应用程序将消息发送到RabbitMQ交换,然后交换将消息重定向到绑定队列。我将RabbitMQ与Springframework AMQP java插件一起使用。 问题:消息进入队列,但消息始终处于“未确认”状态,永远不会变为“就绪”状态。 可能是什么原因? 问题答案: 一条未确认的消息表示您的使用者已经读取了该消息,但是该使用者从未将ACK发送回RabbitMQ代理以表示它

  • 发布确认原理 生产者将信道设置成 confirm 模式,一旦信道进入 confirm 模式,所有在该信道上的消息都会被指派一个唯一的 ID(从一开始),一旦消息被投递到所有匹配的队列后,broker 就会发送一个确认给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了 如果消息和队列是持久化的,那么确认消息会在将消息写入磁盘后发出,broker 回传给生产者的确认消息中 ,

  • 我将不折不扣地学习以下教程:https://www.rabbitmq.com/tutorials/tutorial-two-java.html。 我以这样的方式启动RabbitMQ服务器: 我生成了两个消费者,当我Ctrl+C其中一个时,另一个正在运行的消费者不会接收到最初发往前一个消费者的消息。如何在Ctrl+C'ing从一个消费者中重新传递消息? 编辑:我现在正在通过'brew'安装Rabbi

  • 主要内容:8. 发布高级确认,8.1 发布确认SpringBoot版本,8.2 回退消息,8.3 备份交换机8. 发布高级确认 在生产环境中由于一些不明原因,导致RabbitMQ重启,在RabbitMQ重启期间生产者消息投递失败,导致消息丢失,需要手动处理和恢复 于是,我们开始思考,如何才能进行RabbitMQ的消息可靠投递呢?特别是在这样比较极端的情况下,RabbitMQ集群不可用的时候,无法投递的消息该如何处理呢? 8.1 发布确认SpringBoot版本 8.1.1 发布确认方案 当交换机

  • 以前我读取队列中的所有消息,但现在我必须根据用户的选择返回特定数量的消息(计数)。 我试着相应地改变for循环,但是由于自动应答,它读取了所有的消息。所以我尝试在配置文件中将它改为手动。 在我的程序中,如何在读取 msg 后手动确认消息(目前我正在使用 AmqpTemplate 接收,我没有频道的参考)? 任何帮助都是非常值得赞赏的,提前表示感谢。

  • 我为RabbitMQ制作了一个消费者,作为一个用C#.NET编写的控制台应用程序。它被编程为永久监听队列,每当它在队列中发现消息时,它就处理它。使用者平均每秒处理35条消息。使用者被安排在系统启动时在任务计划程序中运行。消费者运行良好的3-4天。但是,它们继续运行,但不处理任何消息,尽管队列中有消息。当使用者停止并再次启动时,它再次开始正确处理消息。但是,当您手动重新启动时,数以百万计的消息排在队