当前位置: 首页 > 面试题库 >

如何要求RabbitMQ在Spring异步MessageListener用例中发生业务异常时重试

杜英范
2023-03-14
问题内容

我有一个Spring AMQP消息监听器正在运行。

public class ConsumerService implements MessageListener {

    @Autowired
    RabbitTemplate rabbitTemplate;

    @Override
    public void onMessage(Message message) {
        try {
            testService.process(message); //This process method can throw Business Exception
        } catch (BusinessException e) {
           //Here we can just log the exception. How the retry attempt is made?
        } catch (Exception e) {
           //Here we can just log the exception.  How the retry attempt is made?
        }
    }
}

如您所见,在处理过程中可能会出现异常。我想重试,因为Catch块中有特定错误。我无法通过onMessage中的异常。如何告诉RabbitMQ有异常并重试?


问题答案:

由于onMessage()不允许抛出已检查的异常,因此可以将异常包装在中RuntimeException并重新抛出。

try {
    testService.process(message);
} catch (BusinessException e) {
    throw new RuntimeException(e);
}

但是请注意,这可能导致消息无限期地重新发送。这是这样的:

RabbitMQ支持拒绝消息并要求代理重新排队。这显示在这里。但是RabbitMQ本身没有重试策略的机制,例如设置最大重试次数,延迟等。

使用Spring
AMQP时,“拒绝时重新排队”是默认选项。SimpleMessageListenerContainer如果有未处理的异常,Spring
将默认执行此操作。因此,在您的情况下,您只需要重新引发捕获的异常即可。但是请注意,如果您无法处理消息并且总是抛出异常,则它将无限期地重新发送,并导致无限循环。

您可以通过引发AmqpRejectAndDontRequeueException异常来覆盖每个消息的此行为,在这种情况下,不会重新排队该消息。

您也可以SimpleMessageListenerContainer通过设置完全关闭“拒绝时重新排队”行为

container.setDefaultRequeueRejected(false)

如果在RabbitMQ中设置了一条消息,则该消息被拒绝但不重新排队将丢失或转移到DLQ。

如果您需要具有最大尝试次数,延迟次数等的重试策略,最简单的方法是设置一个弹簧“无状态”
RetryOperationsInterceptor,它将在线程内进行所有重试(使用Thread.sleep()),而不会拒绝每次重试的消息(因此无需为每个重试返回RabbitMQ重试)。重试用尽时,默认情况下将记录一条警告,并且该消息将被使用。如果要发送到DLQ,则需要一个RepublishMessageRecovererMessageRecoverer不接受消息而无需重新排队的自定义(在后一种情况下,您还应该在队列上设置
RabbitMQ DLQ)。默认消息恢复程序的示例

container.setAdviceChain(new Advice[] {
        org.springframework.amqp.rabbit.config.RetryInterceptorBuilder
                .stateless()
                .maxAttempts(5)
                .backOffOptions(1000, 2, 5000)
                .build()
});

显然,这样做的缺点是您将在重试的整个过程中占用线程。您还可以选择使用“有状态”
RetryOperationsInterceptor,该状态将在每次重试时将消息发送回RabbitMQ,但是延迟仍将Thread.sleep()在应用程序内实现,此外,设置有状态拦截器会更加复杂。

因此,如果您希望在不占用任何延迟的情况下重试,Thread则需要在RabbitMQ队列上使用TTL进行更复杂的自定义解决方案。如果您不希望出现指数退缩(因此每次重试不会增加延迟),则要简单一些。要实现这种解决方案,您基本上可以在RabbitMQ上创建另一个带有参数的队列:"x-message- ttl": <delay time in milliseconds>"x-dead-letter-exchange":"<name of the original queue>"。然后在您设置的主队列上"x-dead-letter-exchange":"<name of the queue with the TTL>"。因此,现在当您拒绝但不重新排队时,RabbitMQ会将其重定向到第二个队列。TTL过期时,它将被重定向到原始队列,然后重新传递到应用程序。因此,现在您需要一个重试拦截器,该拦截器在每次失败后拒绝发送给RabbitMQ的消息,并跟踪重试计数。为了避免需要在应用程序中保留状态(因为如果您的应用程序是集群的,则需要复制状态),则可以根据x-deathRabbitMQ设置的标头计算重试计数。在此处查看有关此标头的更多信息。因此,在这一点上,实现html" target="_blank">自定义拦截器比通过这种行为自定义Spring有状态拦截器要容易得多。

另请参阅Spring AMQP参考中有关重试的部分。



 类似资料:
  • 我使用的是Spring3.0.5和Hibernate3.6。在我的项目中,有一个场景,我必须回滚在抛出的任何异常或错误发生的事务。这是示例代码,除了当我抛出异常时事务不会回滚之外,一切都很好,但是如果抛出任何异常,比如mysql.IntegrityConstraintException,那么事务会回滚,为什么在我的情况下没有发生这种情况? hibernate.cfg 因此,正如我所说,我的问题是,

  • 我正在用异步JobLauncher在Spring Batch中配置一个(长时间运行的)作业,我有两个RESTendpoint: null 谢谢朱利奥

  • 我正在创建一个Spring MessageListenerAdapter,用于侦听XML消息队列。 这是我的amqp配置: 当前,消息的接收工作正常,但仅当我的侦听器的返回类型是字符串时。 通过以下侦听器,我可以接收XML消息,所以这很好。但我无法获取原始消息的消息属性: 如果我将receiveMessage()的返回类型更改为字节[]或消息,则会收到以下错误消息: 我已经尝试过MessageCo

  • 问题内容: 我在下面使用HIbernate和Spring和JPA。当引发PersistenceException时,我想捕获它并返回错误消息,以便它不会传播到调用者。 但是我得到一个异常,说我需要在异常之后回滚事务,但是当我捕获到异常并且不想重新抛出该异常时,如何回滚它呢? 问题答案: 似乎没有办法回退由Spring ORM管理的失败事务。问题中显示的代码是服务类。将其持久性例程提取到单独的DAO

  • 场景是,我有一个带有spring批处理作业的spring启动应用程序。我正在尝试使用和使批处理作业异步,然后将此taskExecutor分配给JobLauncher。更改后,作业会异步运行,但我在持久化或更新数据库方面遇到了问题: 谢谢你的帮助!

  • 问题内容: 我正在经历所谓的 超时执行HGET company:product:settings,inst:1,队列:8,qu = 0,qs = 8,qc = 0,wr = 0/0,in = 79/1 超时异常。 奇怪的是,同一Redis实例和同一台机器上正在存储数据,但是它是引发此异常的特定应用程序。 更新: 实际上,同一应用程序在上面的一行中,从Redis接收数据。问题在于。 此外,我已经将多