我在我的消费者体内抛出一个AMQP异常。我的期望是,消息将以FIFO顺序返回队列,并在将来的某个时候重新处理。
Spring AMQP似乎没有将消息释放回队列。而是一次又一次地尝试重新处理失败的消息。这会阻止处理新到达的消息。卡在AMQP控制台内的设备将永远处于“未打包”状态。
有什么想法吗?
这是我用来解决这个问题的一个解决方案。我设置了一个拦截器,在应用后退策略时重试消息x次。http://trippstech.blogspot.com/2016/03/rabbitmq-deadletter-queue-with.html
这就是Rabbitmq/Spring AMQP的工作方式;如果消息被拒绝(抛出任何异常),默认情况下消息将被重新排队,并放回队列的头部,以便立即重试。
... 在将来的某个时候再加工。
为了实现这一点,你必须对事情进行适当的配置。
首先,您必须告诉代理不要重新查询消息。这是通过将侦听器容器上的defaultRequeueRejected
设置为false(默认为true)来实现的。或者,您可以抛出一个AmqpRejectAndDontRequeueException
,它指示容器拒绝(而不是重新请求)单个消息。
但这还没有结束;这样做只会导致被拒绝的消息被丢弃。
为了避免这种情况,您必须为队列设置死信交换/队列——拒绝的消息然后被发送到DLX/DLQ,而不是被丢弃。通常建议使用策略而不是队列参数。
最后,您可以设置消息在DLQ上的存活时间,以便在该时间之后,消息将从队列中删除。如果您在该队列(DLQ)上设置了另一个适当的死信交换,您可以在时间到期后将消息重新发送回原始队列。
请注意,这只适用于原始队列中被拒绝的交付;当该队列中的消息过期时,它将不起作用。
有关更多详细信息,请参阅此答案及其问题中的一些链接。
您可以使用x-death
标题的内容来决定在多次尝试后是否应该完全放弃(捕获异常并以某种方式处理错误消息;不要抛出异常,容器将确认消息)。
ActiveMQ/JMS有一个内置机制,用于确保在使用竞争消费者模式时,共享公共报头(即JMSXGroupID报头)的消息始终由队列的同一消费者使用。队列的使用者完全不知道实际的头值,因为具有公共头的消息的保证是在服务器端而不是在使用者端执行的。有关此工作方式的更多详细信息,请参见http://activemq.apache.org/message-groups.html。 用AMQP或者用Rab
问题内容: 在Java中,我们使用try catch块处理异常。我知道我可以像下面这样编写一个try catch块来捕获方法中抛出的任何异常。 但是Java中有什么方法可以让我在发生异常时获取一种称为的特定方法,而不是像上面的方法那样编写一个包罗万象的方法? 具体来说,当抛出异常(我的应用程序逻辑未处理)时,我想在Swing应用程序中显示一条用户友好的消息。 谢谢。 问题答案: 默认情况下,JVM
问题内容: 当我的程序运行引发异常的行时,是否可以启动IPython Shell或提示? 我对引发异常的上下文,变量,作用域(和子作用域)最感兴趣。类似于Visual Studio的调试,当引发异常但未被任何人捕获时,Visual Studio将停止并为我提供调用堆栈和每个级别存在的变量。 您是否认为有办法使用IPython获得类似的东西? 编辑:启动IPython时,该选项似乎并没有达到我想要的
我想使用rabbitMq队列中Storm喷口中的消息。 现在,我们使用Spring AMQP异步发送和接收来自RabbitMq的消息。 Spring AMQP提供了从队列读取消息的机制(创建监听器或使用注释@RabbitListner)。 问题是我可以让一个侦听器从队列中读取消息。但是,我如何将此消息发送到Storm群上运行的Storm喷口? 拓扑将启动一个集群,但在我的spout的nextTup
我有一个生产者和一个消费者。消费者的多个实例正在运行。当生产者发布消息时,我的意图是通过所有实例消费该消息。所以,我使用的是直接交换。生产者将带有主题的消息发布到直接交换。消费者正在通过独占队列收听该主题。当消费者启动并且生产者发布消息时,此过程运行良好。但是当消费者关闭并且生产者发布消息时,消费者在启动时不会消费此消息。 我在谷歌上搜索了这个问题。建议使用命名队列。但是,如果使用命名队列,则消息