当前位置: 首页 > 知识库问答 >
问题:

在用于AMQP的Spring集成中使用ImmediateRequeueMessageRecoverer?

莫选
2023-03-14

我们已经注意到,当错误消息被接收到Spring集成endpoint(从RabbitMQ)时,它们不会被重试。如果我们的业务代码(即接收消息的“服务方法”)出现了问题,导致它抛出异常,那么重试就会发生。

这是我们的配置:

var myService = ...
IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, queueName)
                .id(integrationFlowId)
                .autoStartup(autoStartup)
                .configureContainer(c -> c.acknowledgeMode(MANUAL)
                        .prefetchCount(10)
                        .concurrentConsumers(1)
                        .maxConcurrentConsumers(3))
                .messageConverter(messageConverter))
                .aggregate(...)
                .handle(myService, "myMethod", e -> e.advice(myAdvice()))
                .get();

myadvice方法的实现方式如下:

ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
backOffPolicy.setInitialInterval(200L);
backOffPolicy.setMultiplier(2);
backOffPolicy.setMaxInterval(5000L);

RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.setRetryPolicy((new SimpleRetryPolicy(MAX_VALUE)));
retryTemplate.setBackOffPolicy(backOffPolicy);
retryTemplate.registerListener(new RetryListenerSupport() {
    @Override
    public <T, E extends Throwable> void onError(RetryContext ctx, RetryCallback<T, E> callback, Throwable e) {
        log.error("Caught {} due to {} (count = {})", e.getClass().getSimpleName(), e.getMessage(), ctx.getRetryCount(), e);
    }
});
StatelessRetryOperationsInterceptorFactoryBean bean = new StatelessRetryOperationsInterceptorFactoryBean();
bean.setRetryOperations(retryTemplate);
bean.setMessageRecoverer(new ImmediateRequeueMessageRecoverer());
return bean.getObject();
[my-service-97c696799-6xs26] org.springframework.amqp.AmqpRejectAndDontRequeueException: Error Handler converted exception to fatal
[my-service-97c696799-6xs26]    at org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler.handleError(ConditionalRejectingErrorHandler.java:146)
[my-service-97c696799-6xs26]    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeErrorHandler(AbstractMessageListenerContainer.java:1436)
[my-service-97c696799-6xs26]    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.handleListenerException(AbstractMessageListenerContainer.java:1720)
[my-service-97c696799-6xs26]    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1495)
[my-service-97c696799-6xs26]    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:967)
[my-service-97c696799-6xs26]    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:913)
[my-service-97c696799-6xs26]    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:83)
[my-service-97c696799-6xs26]    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1288)
[my-service-97c696799-6xs26]    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1194)
[my-service-97c696799-6xs26]    at java.base/java.lang.Thread.run(Thread.java:831)
[my-service-97c696799-6xs26] Caused by: org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Listener threw exception
[my-service-97c696799-6xs26]    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:1746)
[my-service-97c696799-6xs26]    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1636)
[my-service-97c696799-6xs26]    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1551)
[my-service-97c696799-6xs26]    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1539)
[my-service-97c696799-6xs26]    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1530)
[my-service-97c696799-6xs26]    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1474)
[my-service-97c696799-6xs26]    ... 6 common frames omitted
[my-service-97c696799-6xs26] Caused by: org.springframework.amqp.support.converter.MessageConversionException: Don't know how to convert (Body:'{ "yo" : "MTV Raps" }' MessageProperties [headers={content_type=application/json}, contentType=application/json, contentLength=0, receivedDeliveryMode=NON_PERSISTENT, redelivered=false, receivedExchange=, receivedRoutingKey=myservice.routingkey, deliveryTag=1, consumerTag=amq.ctag-9De2w0uuQxnve_9k6HZ7tw, consumerQueue=myservice.myqueue]) to an object because no event type was found
[my-service-97c696799-6xs26]    at com.mycompany.RabbitMQEventMessageConverter.fromMessage(RabbitMQEventMessageConverter.java:47)
[my-service-97c696799-6xs26]    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.convertPayload(AmqpInboundChannelAdapter.java:361)
[my-service-97c696799-6xs26]    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.createMessageFromAmqp(AmqpInboundChannelAdapter.java:342)
[my-service-97c696799-6xs26]    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.createAndSend(AmqpInboundChannelAdapter.java:334)
[my-service-97c696799-6xs26]    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.onMessage(AmqpInboundChannelAdapter.java:299)
[my-service-97c696799-6xs26]    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1632)
[my-service-97c696799-6xs26]    ... 10 common frames omitted

有人知道我如何在spring集成调用“服务方法”之前重新排队/重试失败的消息吗?

我们使用Spring Integration 5.4.6和Spring Boot 2.4.6。

共有1个答案

宋烨烁
2023-03-14

在创建消息之前执行转换。

转换错误通常被认为是致命的--重试没有任何意义,因为它会再次失败。

向入站适配器添加.errorchannel;它的下游流将获得一个errormessage来表示转换错误。

 类似资料:
  • 我有一个基于Java Spring-Cloud的微服务,使用Spring Boot Starter AMQP与RabbitMQ集成(摘自下面的): 现在我想使用Sleuth将此服务连接到Zipkin监控。根据文档,当启用AMQP支持时,Sleuth通过RabbitMQ队列发送其所有数据。出于某种原因,我想禁用这个默认行为,并通过HTTP发送数据。可能有一个魔法属性我找不到。您知道我如何强制应用程序

  • 寻找一个示例,展示将spring cloud sleuth与spring boot amqp(rabbit)发布者和订阅者集成在一起。 我确实在日志中看到以下消息 另外,在接收服务上应该做些什么?

  • 我有一个用例,用户将多个csv文件放到远程目录中,然后放置ready.txt来指示文件已准备好使用。当我们的applcation在远程目录中看到ready.txt文件时,它应该开始使用sftp文件入站通道适配器将所有文件复制到本地目录,包括ready.txt。是否有办法确保readt.txt文件是最后一个要复制到本地目录的文件? 因为当文件从远程目录复制到本地目录时,我有另一个文件入站通道适配器在

  • 我使用@Profile Spring注释在嵌入式、独立和容器管理的数据源之间进行选择。为了选择“嵌入”,我的集成测试被注释为激活适当的配置文件: 问题是,我想将'@ActiveProfiles'移动到TestConfigWrapper,但这样做没有得到执行,应用程序上下文也不会加载任何数据源。 有没有一种方法可以使用java配置来实现这一点?

  • Im在rabbit mq中使用spring Http集成和amqp,无法对并发用户进行负载测试。。。jmeter和rabbitmq消息计数中显示的消息计数不匹配。。对于30个用户来说效果很好

  • 我是SpringAMQP的新手。我有一个应用程序是生产者发送消息给另一个应用程序是消费者。 一旦消费者收到消息,我们将对数据进行验证。 如果数据是正确的,我们必须确认,消息应该从队列中删除。如果数据不正确,我们必须对数据进行NACK(否定确认),以便它在RabbitMQ中重新排队。 我偶然发现