当前位置: 首页 > 知识库问答 >
问题:

尽管DefaultRequeuerEjected=false,但rabbit代理中的消息仍未确认

鲍建业
2023-03-14

我的场景:我向我的Rabbit代理发布了两条消息,在处理第一条消息时发生了一个未处理的异常。

我看到的情况:有3次重试尝试处理消息,但由于我的强制异常而失败(请参见下面errorhandler中的代码)。

BlockingQueueConsumerConsumer标记是相同的,所以我猜测BlockingQueueConsumer没有重新启动。但是,下面的日志显示它确实在继续等待消息。

我想知道为什么BlockingQueueConsumer没有nack消息,以及为什么尽管日志中有证据表明使用者正在等待消息,但后续的消息没有被使用。

任何建议或背景信息将非常欢迎!

@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer(ConnectionFactory connectionFactory, Queue mainQueue, RetryOperationsInterceptor retryOperationsInterceptor) {
    SimpleMessageListenerContainer retVal = new SimpleMessageListenerContainer(connectionFactory);
    retVal.addQueues(mainQueue);
    retVal.setAcknowledgeMode(AcknowledgeMode.MANUAL);
    retVal.setDefaultRequeueRejected(false);
    retVal.setAdviceChain(new Advice[]{retryOperationsInterceptor});
    return retVal;
}

@Bean
public RetryOperationsInterceptor retryOperationsInterceptor () {
    return stateless().recoverer(new RejectAndDontRequeueRecoverer()).build();
}

<int-amqp:inbound-channel-adapter
    channel="fromRabbitChannel"
    error-channel="errorChannel"
    listener-container="simpleMessageListenerContainer"
    />

<int:service-activator ref="errorHandler" input-channel="errorChannel" method="handleError"/>

@MessageEndpoint
public class ErrorHandler {
    public void handleError(Message<MessagingException> message) throws IOException {
        throw new IllegalStateException("FORCED EXCEPTION");
    }
}

09:49:38.219 [SimpleAsyncTaskExecutor-1] INFO  c.p.a.f.ErrorHandler - Throwing an exception!!
09:49:38.219 [SimpleAsyncTaskExecutor-1] DEBUG o.s.retry.support.RetryTemplate - Checking for rethrow: count=3
09:49:38.219 [SimpleAsyncTaskExecutor-1] DEBUG o.s.retry.support.RetryTemplate - Retry failed last attempt: count=3
09:49:38.220 [SimpleAsyncTaskExecutor-1] WARN  o.s.a.r.r.RejectAndDontRequeueRecoverer - Retries exhausted for message (Body:'[B@c78ef32(byte[97])'MessageProperties [blah blah])
    org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener threw exception
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:865) [spring-rabbit-1.5.2.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:760) [spring-rabbit-1.5.2.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:680) [spring-rabbit-1.5.2.RELEASE.jar:na]
....
....
09:49:38.221 [SimpleAsyncTaskExecutor-1] WARN  o.s.a.r.l.ConditionalRejectingErrorHandler - Execution of Rabbit message listener failed.
org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Retry Policy Exhausted
at org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer.recover(RejectAndDontRequeueRecoverer.java:44) ~[spring-rabbit-1.5.2.RELEASE.jar:na]
at org.springframework.amqp.rabbit.config.StatelessRetryOperationsInterceptorFactoryBean$1.recover(StatelessRetryOperationsInterceptorFactoryBean.java:59) ~[spring-rabbit-1.5.2.RELEASE.jar:na]
at org.springframework.amqp.rabbit.config.StatelessRetryOperationsInterceptorFactoryBean$1.recover(StatelessRetryOperationsInterceptorFactoryBean.java:53) ~[spring-rabbit-1.5.2.RELEASE.jar:na]
at org.springframework.retry.interceptor.RetryOperationsInterceptor$ItemRecovererCallback.recover(RetryOperationsInterceptor.java:124) ~[spring-retry-1.1.2.RELEASE.jar:na]
at org.springframework.retry.support.RetryTemplate.handleRetryExhausted(RetryTemplate.java:458) ~[spring-retry-1.1.2.RELEASE.jar:na]
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:320) ~[spring-retry-1.1.2.RELEASE.jar:na]
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:168) ~[spring-retry-1.1.2.RELEASE.jar:na]
....
....
09:49:38.222 [SimpleAsyncTaskExecutor-1] DEBUG o.s.a.r.l.BlockingQueueConsumer - Retrieving delivery for Consumer: tags=[{amq.ctag-XVCBQNXxCMFERaF1kbeI3Q=debitCardStatusQueue}], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5671/,1), acknowledgeMode=MANUAL local queue size=0
09:49:39.222 [SimpleAsyncTaskExecutor-1] DEBUG o.s.a.r.l.BlockingQueueConsumer - Retrieving delivery for Consumer: tags=[{amq.ctag-XVCBQNXxCMFERaF1kbeI3Q=debitCardStatusQueue}], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5671/,1), acknowledgeMode=MANUAL local queue size=0

共有1个答案

杜彦君
2023-03-14

Retval.SetAcknowdgEmode(AcknowdgEmode.Manual);

使用手动ack,您负责ack或拒绝消息;如果您将模式设置为“自动”,容器将仅ACK/NACK;然后它将完全按照您的要求执行。

 类似资料:
  • 我创建了一个具有事务 和 的生产者,该创建器将 99 条消息放在队列中。 我为具有不同会话的同一队列创建了一个消费者,并处理了和。 我没有确认第一条消息,但确认了其余98条消息。 我打开ActiveMQ管理控制台,预计会看到1条消息挂起/1条在队列中,但令我惊讶的是,我看到所有99条消息都已出列。 有人能指出我哪里出错了吗?

  • 在我的应用程序中,我有3个活动,主要活动,次要活动和第三次活动。我希望SecondaryActivity成为Android 6上特定域的默认应用程序链接处理程序,如本指南所述。同时,我希望另一个活动TertiaryActivity能够处理来自另一个域的链接,但不是默认处理程序,因为我不拥有该域。下面是我的AndroidManifest来说明: 我阅读了这本关于应用程序链接的广泛指南,它解释了And

  • 问题内容: 我不明白…是我还是这是节点中的错误? 可以按预期进行: 这发出了警告: 我懂了 问题答案: 使用与承诺回报新希望(这就是所谓的链接)。因此,当您执行以下操作时: 您的最初承诺在哪里,您将在第1行上创建一个新的承诺(现在不再存在。我们称它为 )。因此,即使您使用with ,也不会处理上的拒绝,这解释了您在控制台上看到的消息。 为了避免出现此消息,您应该在第1行的新承诺中添加a

  • 我有一个关于Rabbitmq交付稳定性的问题。 安兔兔mq官方主页称,确认电话保证至少一次送达。 生产者生成的消息是否可以多次传递?(我想知道消费者是否可以对同一条消息进行重复工作。) 我英语说得不好。内容可能很奇怪。我希望你能理解。

  • 商城消息管理 系统集中对消息管理进行了整合,这里商家可以对短信、邮箱、微信模板、商家接收的消息进行配置。商家下单会有消息通知。 这里有会员消息和商家消息的模板 具体可参考配置

  • 我正在使用 发送和 对于现在从rappid mq接收消息,我希望使用类似以下内容的侦听器: 问题是onMessage监听器与Messages一起工作是否有可能在类似的函数中接收简单的可序列化对象?