我有一个使用Spring和RabbitMQ的项目设置。目前,我的应用程序可能会收到一条amqp消息,在另一个异步进程完成之前无法处理该消息(遗留和完全分离,我无法控制)。因此,结果是我可能不得不等待处理消息一段时间。其结果是变压器出现异常。
当消息被NACK回rabbitMQ时,它会将其放回队列的头部,并立即重新拉入队列。如果我收到的无法处理的消息等于并发侦听器的数量,我的工作流就会被锁定。它转动轮子,等待消息变得可处理,即使队列中有有效的可处理消息在后面等待。
有没有办法拒绝和amqp消息,让它回到队列的尾部?从我的研究RabbitMQ曾经这样工作过,但现在我似乎只得到队列的头。
我的配置相当直接,但为了连续性,这里是...
连接工厂是:org.springframework.amqp.rabbit.connection.CachingConnectionFactory RabbitMQ 3.1.1
Spring集成:2.2.0
<si:channel id="channel"/>
<si-amqp:inbound-channel-adapter
queue-names="commit" channel="channel" connection-factory="amqpConnectionFactory"
acknowledge-mode="AUTO" concurrent-consumers="${listeners}"
channel-transacted="true"
transaction-manager="transactionManager"/>
<si:chain input-channel="channel" output-channel="nullChannel">
<si:transformer ref="transformer"></si:transformer>
<si:service-activator ref="activator"/>
</si:chain>
你说的没错RabbitMQ在一段时间前被更改了。API中没有任何内容可以更改行为。
当然,您可以在入站适配器上放置一个错误通道
,然后是一个变压器(表达式="payload.failedMessage"
),然后是一个出站适配器,该适配器配置有适当的交换/路由键来请求队列后面的消息。
您可能想要在错误流中添加一些额外的逻辑来检查异常类型(payload.cause
)并决定您想要的操作。
如果错误流本身抛出异常,原始消息将像以前一样在头部重新排队;如果它正常退出,消息将被确认。
我有一个应用程序,它使用spring AMQP向其他应用程序消费和生成消息。我有一个场景,其中发生了一些异常,我需要重新排队回到RabbitMQ。对于一些例外情况,我需要忽略(基本上我需要忽略消息,无需重新查询) 目前在下面的代码中,我已经将配置设置为 工厂setDefaultRequeueRejected(假); 但我的要求是动态拒绝某些消息,并将某些消息重新排队回RabbitMQ。 请建议
我有交换和排队。生产者不需要消费确认,但在某些情况下,由于缺乏其他数据,消费者在当前时刻可能无法处理消息。因此,我想将这些消息返回到队列的末尾。怎么做?还是在我拒绝邮件时自动完成? 流量: Message1被使用并在数据库中创建一些记录 所以存在消息排序问题,在一般情况下,我会按顺序获取消息,因为大多数组件都能正确地传递消息。我想解决一个潜在的问题,当Message1的制作人由于负载过重或其他原因
我翻阅了rabbitmq文档,似乎rabbitmq不处理消息重新传递计数。如果我要手动确认/NACK消息,我需要将重试计数保存在内存中(例如,使用correlationId作为映射中的唯一键),或者在消息中设置我自己的头并重新发送它(从而将其放在队列的末尾) 然而,这是一个Spring处理的情况。具体来说,我指的是RetryInterceptorBuilder.stateful()。maxAtte
这就是事情。 我正在使用PHP AMQP从Rabbitmq读取结果队列,以便处理发送的每封电子邮件上的重要信息。完成后,我需要将该消息删除或标记为已写入,以便下次读取队列时,不会得到已处理的消息。 由于Rabbitmq服务器每小时发送超过10.000封电子邮件,每次我读取队列以处理结果发送时,脚本至少可以运行5分钟,以便处理队列中的所有消息,因此在完成后,在这5分钟内会发送数百条新消息。这使得我无
为什么已经拥有了共享内存时需要消息队列呢? 这将是多种原因,让我们将其分解为多个点来简化 - 据了解,一旦消息被一个进程接收到,它将不再可用于任何其他进程。 而在共享内存中,数据可供多个进程访问。 如果想使用小信息格式进行通信。 当多个进程同时进行通信时,共享内存数据需要同步保护。 使用共享内存的写入和读取频率很高,那么实现功能将会非常复杂。 在这种情况下不值得使用。 如果所有的进程不需要访问共享
一、消息模型 点对点 发布/订阅 二、使用场景 异步处理 流量削锋 应用解耦 三、可靠性 发送端的可靠性 接收端的可靠性 参考资料 一、消息模型 点对点 消息生产者向消息队列中发送了一个消息之后,只能被一个消费者消费一次。 发布/订阅 消息生产者向频道发送一个消息之后,多个消费者可以从该频道订阅到这条消息并消费。 发布与订阅模式和观察者模式有以下不同: 观察者模式中,观察者和主题都知道对方的存在;