当前位置: 首页 > 知识库问答 >
问题:

使用JMS重试/重传JavaDSL的Spring集成

吴星汉
2023-03-14

当msg处理抛出异常时,如何有效地支持JMS重新交付?

我有一个使用JMS(ActiveMQ)的流,它具有配置为允许n次重新传递尝试的连接工厂。

我希望在处理msg时出现任何错误,导致msg在connectionFactory配置允许的情况下被放回重新交付,然后在最大重新交付尝试用尽时,交付给DLQ。与AMQ保持一致。

对一个相关SO问题的回答意味着我可能会有一个重新抛出的错误通道,它应该触发重新交付:Spring Integration DSL ErrorHandling

但是,以下情况不会发生:

/***
 * Dispatch msgs from JMS queue to a handler using a rate-limit
 * @param connectionFactory
 * @return
 */
@Bean
public IntegrationFlow flow2(@Qualifier("spring-int-connection-factory") ConnectionFactory connectionFactory) {

    IntegrationFlow flow =  IntegrationFlows.from(
            Jms.inboundAdapter(connectionFactory)
                    .configureJmsTemplate(t -> t.receiveTimeout(1000))
                    .destination(INPUT_DIRECT_QUEUE),
            e -> e.poller(Pollers
                    .fixedDelay(5000)
                    .errorChannel("customErrorChannel")
                    //.errorHandler(this.msgHandler)
                    .maxMessagesPerPoll(2))
    ).handle(this.msgHandler).get();

    return flow;
}

@Bean
public MessageChannel customErrorChannel() {
    return MessageChannels.direct("customErrorChannel").get();
}

@Bean
public IntegrationFlow customErrorFlow() {
    return IntegrationFlows.from(customErrorChannel())
            .handle ("simpleMessageHandler","handleError")
            .get();
}

errorChannel方法impl:

   public void handleError(Throwable t) throws Throwable {
        log.warn("got error from customErrorChannel");
        throw t;
    }

当从flow2中的处理程序引发异常时,errorChannel确实会获得异常,但重新引发会导致MessageHandlingException:

2018-08-13 09:00:34.221  WARN 98425 --- [ask-scheduler-5] c.v.m.i.jms.SimpleMessageHandler         : got error from customErrorChannel
2018-08-13 09:00:34.224  WARN 98425 --- [ask-scheduler-5] o.s.i.c.MessagePublishingErrorHandler    : Error message was not delivered.

org.springframework.messaging.MessageHandlingException: nested exception is org.springframework.messaging.MessageHandlingException: error occurred in message handler [simpleMessageHandler]; nested exception is java.lang.IllegalArgumentException: dont want first try, failedMessage=GenericMessage [payload=Enter some text here for the message body..., headers={jms_redelivered=false, jms_destination=queue://_dev.directQueue, jms_correlationId=, jms_type=, id=c2dbffc8-8ab0-486f-f2e5-e8d613d62b6a, priority=0, jms_timestamp=1534176031021, jms_messageId=ID:che2-39670-1533047293479-4:9:1:1:8, timestamp=1534176034205}]
    at org.springframework.integration.handler.MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:107) ~[spring-integration-core-5.0.7.RELEASE.jar:5.0.7.RELEASE]
    at org.springframework.integration.handler.BeanNameMessageProcessor.processMessage(BeanNameMessageProcessor.java:61) ~[spring-integration-core-5.0.7.RELEASE.jar:5.0.7.RELEASE]
    at org.springframework.integration.handler.ServiceActivatingHandler.handleRequestMessage(ServiceActivatingHandler.java:93) ~[spring-integration-core-5.0.7.RELEASE.jar:5.0.7.RELEASE]

共有1个答案

东郭昌胤
2023-03-14

它可以与消息驱动的通道适配器一起使用,但我认为这不是您想要的,因为这个问题。

因为轮询的适配器使用JmsTemplate。receive()操作,在调用流时,消息已经确认。

您需要使用带有JmsTransactionManager的事务轮询器,以便错误流引发的异常回滚事务并重新传递消息。

 类似资料:
  • 我正在以以下方式向队列发送消息: 我想安排重复我的信息。我的意思是,无论控制器(如下所示)中的this line正在发送什么消息,我都想继续发送,比如说10次或100次(取决于我设置的计时器)。我的消费者(以下未显示)将继续消费相同的消息,直到我要求它停止。例如,即使我的生产者将发送消息10或100次,如果我想在第5次(生产者发送消息10次)或第50次(生产者发送消息100次)停止接收消息,我应该

  • 当BOTHRESH=0时,MQ JMS类是否可以重新传递消息? 使用上述设置,是否可以使用重新传递计数通过应用程序代码管理邮件传递重试?

  • 我想用Spring集成创建一个简单的IntegrationFlow,但我遇到了一些困难。 我想创建一个集成流,从Rabbit Mq中的队列中获取消息并将消息发布到endpointRest。 我要处理的问题是,当一个请求失败时,它会继续无休止地重试,如何在这段代码中实现重试策略?例如,我想要3次重试,第一次重试在1秒后,第二次重试在5秒后,第三次重试在1分钟后。

  • 我正在尝试从JMS队列(使用ActiveMQ)读取消息。我面临的问题是,消息正在从队列中读取,但没有显示在“服务激活器”中。 非常感谢您的帮助。 我的代码如下: (1) Spring配置 (2) 服务激活器MDP: (3) 申请开始课程: 谢谢

  • 我正在使用WerbLogic 10.3.5和Spring 3.0实现JMS队列。我有以下Spring配置: 我的消息创建代码如下所示: 我的听众是这样的: 消息被正确创建,侦听器的onMessage()方法被调用,但是如果逻辑失败,我抛出RuntimeException(),消息不会被重新传递。我尝试了上述代码的许多细微变化(例如设置SessionAcknowledgeMemodeName=SES

  • 我将我们的应用程序从SpringBoot1.5迁移到SpringBoot2.2。我正在为我们的渐变身材而挣扎。在过去,我们使用以下方法获得一个包含所有Spring依赖项的可执行Jar文件,以分别在CI管道中运行集成测试: 集成Jar任务用我们的集成测试类和主类构建一个基本jar。在过去,我们能够通过重新打包任务“增强”这个罐子。但是在Spring Boot 2.2中,此任务不再存在。Spring文