当前位置: 首页 > 知识库问答 >
问题:

Spring集成(重试策略)

敖永丰
2023-03-14

我想用Spring集成创建一个简单的IntegrationFlow,但我遇到了一些困难。

我想创建一个集成流,从Rabbit Mq中的队列中获取消息并将消息发布到endpointRest。

我要处理的问题是,当一个请求失败时,它会继续无休止地重试,如何在这段代码中实现重试策略?例如,我想要3次重试,第一次重试在1秒后,第二次重试在5秒后,第三次重试在1分钟后。


        HttpHeaders headers = new HttpHeaders();
        headers.setContentType(MediaType.APPLICATION_JSON);
        RestTemplate restTemplate = new RestTemplate();
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
        container.setQueueNames(BOUTIQUE_QUEUE_NAME);
        container.setAcknowledgeMode(AcknowledgeMode.AUTO);
        return IntegrationFlows.from(Amqp.inboundAdapter(container)) /* Get Message from RabbitMQ */
                .handle(msg ->
                {
                    String msgString = new String((byte[]) msg.getPayload(), StandardCharsets.UTF_8);
                    HttpEntity<String> requestBody = new HttpEntity<String>(msgString, headers);
                    restTemplate.postForObject(ENDPOINT_LOCAL_URL, requestBody, String.class);
                    System.out.println(msgString);
                   
                })
                .get();
    }

共有1个答案

於子晋
2023-03-14

将重试侦听器添加到侦听器容器的建议链。看见https://docs.spring.io/spring-amqp/docs/2.2.10.RELEASE/reference/html/#retry和https://docs.spring.io/spring-amqp/docs/2.2.10.RELEASE/reference/html/#async-听众

编辑

@SpringBootApplication
public class So63596805Application {

    private static final Logger LOG = LoggerFactory.getLogger(So63596805Application.class);

    public static void main(String[] args) {
        SpringApplication.run(So63596805Application.class, args);
    }

    @Bean
    IntegrationFlow flow(SimpleRabbitListenerContainerFactory factory, RabbitTemplate template) {
        SimpleMessageListenerContainer container = factory.createListenerContainer();
        container.setQueueNames("foo");
        container.setAdviceChain(RetryInterceptorBuilder.stateless()
                .maxAttempts(5)
                .backOffOptions(1000, 2.0, 10000)
                .recoverer((msg, cause) -> LOG.info("Retries exhausted for " + msg))
                .build());
        return IntegrationFlows.from(Amqp.inboundAdapter(container))
                .handle(msg -> {
                    LOG.info(msg.getPayload().toString());
                    throw new RuntimeException("test");
                })
                .get();
    }

}

这使用指数退避策略。

如果您使用

.maxAttempts(4)
.backOffOptions(1000, 5.0, 60000)

您将在1、5和25秒后获得3次重试。

1000, 8.0, 60000会给你1、8和60秒。

如果你必须有你的规格(1,5,60),你将需要一个自定义的退避政策。

 类似资料:
  • 我有一个spring-boot应用程序,它有一个POST REST API,实习生可以使用JPA和Hikari将数据插入到Postgresql中。由于Post请求的数量很大,我计划在JPA内置方法(如save()、saveAll()、findAll()等)上实现Spring重试,

  • 我知道这方面有几个问题...但我仍然无法使其工作。我有一个Spring应用程序,我正在尝试为其编写集成测试。我尝试使用xml文件设置应用程序上下文(或多或少与我用于应用程序的xml文件相同)。出现的问题如下: > 我指定了正确的路径,然后它开始抱怨在类路径上找不到资源。我已经复制了test/resources文件夹中缺少的文件。 现在,它无法加载应用程序上下文并出现以下错误:没有找到[javax.

  • 我有一些集成测试是这样的: 和下面这样的测试: 我希望能够抵消时钟bean在一天的不同时间运行一些测试。我该怎么做? 但那里什么都没发生。我需要@import什么吗?我需要自动连线吗? 谢谢!

  • 当msg处理抛出异常时,如何有效地支持JMS重新交付? 我有一个使用JMS(ActiveMQ)的流,它具有配置为允许n次重新传递尝试的连接工厂。 我希望在处理msg时出现任何错误,导致msg在connectionFactory配置允许的情况下被放回重新交付,然后在最大重新交付尝试用尽时,交付给DLQ。与AMQ保持一致。 对一个相关SO问题的回答意味着我可能会有一个重新抛出的错误通道,它应该触发重新

  • 我已经建立了一个简单的Spring集成流程,该流程由以下步骤组成: 然后定期轮询一个rest api 对有效载荷做一些处理 并将其置于Kafka主题上。 请遵守以下代码: 这非常有效,然而,我正在努力想出一些好的测试。 我应该如何模拟外部RESTAPI

  • 我对Spring Integration是新手。我尝试使用文件拆分器将消息从文件中拆分出来,然后使用.aggregate()构建单个消息并发送到输出通道。我有标记为true,因此apply-sequence现在默认为false。我已经使用EnricHeaders将correlationId设置为常量“1”。我在设置realease策略时遇到了困难,因为我没有在序列结束上保持。下面是我的代码的外观。