当前位置: 首页 > 知识库问答 >
问题:

Spring AMQP是否基于JVM重新排队消息计数?

池兴邦
2023-03-14

我翻阅了rabbitmq文档,似乎rabbitmq不处理消息重新传递计数。如果我要手动确认/NACK消息,我需要将重试计数保存在内存中(例如,使用correlationId作为映射中的唯一键),或者在消息中设置我自己的头并重新发送它(从而将其放在队列的末尾)

然而,这是一个Spring处理的情况。具体来说,我指的是RetryInterceptorBuilder.stateful()。maxAttemts(x)。不过,这个计数是特定于JVM的,还是以某种方式操纵消息?

例如,我有一个Web应用程序部署到2台服务器,maxAttemcript设置为5。是否有可能总的重新交付计数在5-9之间,这取决于它在2台服务器之间重新交付和重新处理的顺序?

共有1个答案

程赞
2023-03-14

Rabbit/AMQP不允许在基于拒绝的重新执行时修改消息。

状态(基于MessageId)是在一个RetryContextCache中维护的;默认值是一个MapRetryContextCache。这并不真正适合于一个集群,因为,正如你所说,尝试最多可能是(((maxAttemts-1)*n 1);另外,它会导致内存泄漏(某些服务器上的状态)。您可以在RetryTemboardRetryAction中配置一个SoftReancceMapRetryContextCache,以避免内存泄漏,但这只能解决内存泄漏。

您需要使用自定义的RetryContextCache与一些持久共享存储(例如redis)。

我通常建议在这种情况下使用无状态恢复——重试完全在容器中完成,根本不涉及兔子(直到重试用尽,在这种情况下,消息将被丢弃或发送到DLX/DLQ,这取决于代理配置)。

如果您不关心消息顺序(我假设您没有考虑到存在竞争的消费者),一种有趣的技术是拒绝消息,将其发送到具有过期设置的DLQ,当DLQ消息过期时,将其路由回原始队列的尾部(而不是头部)。在这种情况下,可以检查x-death标头以确定其重试次数。

这个回答和这个有更多的细节。

 类似资料:
  • 我有一个使用Spring和RabbitMQ的项目设置。目前,我的应用程序可能会收到一条amqp消息,在另一个异步进程完成之前无法处理该消息(遗留和完全分离,我无法控制)。因此,结果是我可能不得不等待处理消息一段时间。其结果是变压器出现异常。 当消息被NACK回rabbitMQ时,它会将其放回队列的头部,并立即重新拉入队列。如果我收到的无法处理的消息等于并发侦听器的数量,我的工作流就会被锁定。它转动

  • 我正在为Azure服务总线使用最新的Java绑定(V3.1.3):https://github.com/Azure/azure-sdk-for-java/tree/master/sdk/servicebus 当我创建一个新的队列客户端,计划一条消息,然后取消它... ...代码似乎按预期工作:活动消息计数变为0。但是一旦被调度的消息到达它应该被调度的时间(我测试了10秒和100秒以后),消息有时会

  • 我有一个应用程序,它使用spring AMQP向其他应用程序消费和生成消息。我有一个场景,其中发生了一些异常,我需要重新排队回到RabbitMQ。对于一些例外情况,我需要忽略(基本上我需要忽略消息,无需重新查询) 目前在下面的代码中,我已经将配置设置为 工厂setDefaultRequeueRejected(假); 但我的要求是动态拒绝某些消息,并将某些消息重新排队回RabbitMQ。 请建议

  • 在我对Artemis LastValueQueue代码的测试和回顾中,消息的调度延迟似乎优先于对“last value key”的评估。换言之,如果您安排了一条消息,那么它只会在准备传递时替换队列中的最后一个值。 我的问题是我是否正确理解了代码,如果是,是否有一个解决方案或ActiveMQ/Artemis的功能可以帮助满足我们的要求。 我们的要求如下: 生成一条消息,并将该消息的处理延迟到将来的某

  • 我们正在两个不同的实例上运行云服务。这个云服务在服务总线队列上旋转一个接收器。此接收器配置为:1。窥视并锁定模式2。自动完成=真3。RenewLockTimeout=10分钟4。MaxConcurrentMessages=1 5。PrefetchCount=默认值(应为0) 几天前,Azure决定更新我们的实例。我从日志中看到,在18.18时,第一个实例被请求停止。在18.24时,该实例停止,并在

  • 新服务器密钥是否仅限于消息传递? 说明:在firebase项目设置中,我可以获得“旧”和新服务器密钥(云消息选项卡)。旧版本无法通过发送推送通知https://fcm.googleapis.com/fcm/send 因为响应说它是一个遗留服务器密钥。但在这里,它可以被限制在某些谷歌API中https://console.developers.google.com/apis. 谷歌API控制台中没有