当msg处理抛出异常时,如何有效地支持JMS重新交付?
我有一个使用JMS(ActiveMQ)的流,它具有配置为允许n次重新传递尝试的连接工厂。
我希望在处理msg时出现任何错误,导致msg在connectionFactory配置允许的情况下被放回重新交付,然后在最大重新交付尝试用尽时,交付给DLQ。与AMQ保持一致。
对一个相关SO问题的回答意味着我可能会有一个重新抛出的错误通道,它应该触发重新交付:Spring Integration DSL ErrorHandling
但是,以下情况不会发生:
/***
* Dispatch msgs from JMS queue to a handler using a rate-limit
* @param connectionFactory
* @return
*/
@Bean
public IntegrationFlow flow2(@Qualifier("spring-int-connection-factory") ConnectionFactory connectionFactory) {
IntegrationFlow flow = IntegrationFlows.from(
Jms.inboundAdapter(connectionFactory)
.configureJmsTemplate(t -> t.receiveTimeout(1000))
.destination(INPUT_DIRECT_QUEUE),
e -> e.poller(Pollers
.fixedDelay(5000)
.errorChannel("customErrorChannel")
//.errorHandler(this.msgHandler)
.maxMessagesPerPoll(2))
).handle(this.msgHandler).get();
return flow;
}
@Bean
public MessageChannel customErrorChannel() {
return MessageChannels.direct("customErrorChannel").get();
}
@Bean
public IntegrationFlow customErrorFlow() {
return IntegrationFlows.from(customErrorChannel())
.handle ("simpleMessageHandler","handleError")
.get();
}
errorChannel方法impl:
public void handleError(Throwable t) throws Throwable {
log.warn("got error from customErrorChannel");
throw t;
}
当从flow2中的处理程序引发异常时,errorChannel确实会获得异常,但重新引发会导致MessageHandlingException:
2018-08-13 09:00:34.221 WARN 98425 --- [ask-scheduler-5] c.v.m.i.jms.SimpleMessageHandler : got error from customErrorChannel
2018-08-13 09:00:34.224 WARN 98425 --- [ask-scheduler-5] o.s.i.c.MessagePublishingErrorHandler : Error message was not delivered.
org.springframework.messaging.MessageHandlingException: nested exception is org.springframework.messaging.MessageHandlingException: error occurred in message handler [simpleMessageHandler]; nested exception is java.lang.IllegalArgumentException: dont want first try, failedMessage=GenericMessage [payload=Enter some text here for the message body..., headers={jms_redelivered=false, jms_destination=queue://_dev.directQueue, jms_correlationId=, jms_type=, id=c2dbffc8-8ab0-486f-f2e5-e8d613d62b6a, priority=0, jms_timestamp=1534176031021, jms_messageId=ID:che2-39670-1533047293479-4:9:1:1:8, timestamp=1534176034205}]
at org.springframework.integration.handler.MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:107) ~[spring-integration-core-5.0.7.RELEASE.jar:5.0.7.RELEASE]
at org.springframework.integration.handler.BeanNameMessageProcessor.processMessage(BeanNameMessageProcessor.java:61) ~[spring-integration-core-5.0.7.RELEASE.jar:5.0.7.RELEASE]
at org.springframework.integration.handler.ServiceActivatingHandler.handleRequestMessage(ServiceActivatingHandler.java:93) ~[spring-integration-core-5.0.7.RELEASE.jar:5.0.7.RELEASE]
它可以与消息驱动的通道适配器一起使用,但我认为这不是您想要的,因为这个问题。
因为轮询的适配器使用JmsTemplate。receive()
操作,在调用流时,消息已经确认。
您需要使用带有JmsTransactionManager
的事务轮询器,以便错误流引发的异常回滚事务并重新传递消息。
我正在以以下方式向队列发送消息: 我想安排重复我的信息。我的意思是,无论控制器(如下所示)中的this line正在发送什么消息,我都想继续发送,比如说10次或100次(取决于我设置的计时器)。我的消费者(以下未显示)将继续消费相同的消息,直到我要求它停止。例如,即使我的生产者将发送消息10或100次,如果我想在第5次(生产者发送消息10次)或第50次(生产者发送消息100次)停止接收消息,我应该
当BOTHRESH=0时,MQ JMS类是否可以重新传递消息? 使用上述设置,是否可以使用重新传递计数通过应用程序代码管理邮件传递重试?
我想用Spring集成创建一个简单的IntegrationFlow,但我遇到了一些困难。 我想创建一个集成流,从Rabbit Mq中的队列中获取消息并将消息发布到endpointRest。 我要处理的问题是,当一个请求失败时,它会继续无休止地重试,如何在这段代码中实现重试策略?例如,我想要3次重试,第一次重试在1秒后,第二次重试在5秒后,第三次重试在1分钟后。
我正在尝试从JMS队列(使用ActiveMQ)读取消息。我面临的问题是,消息正在从队列中读取,但没有显示在“服务激活器”中。 非常感谢您的帮助。 我的代码如下: (1) Spring配置 (2) 服务激活器MDP: (3) 申请开始课程: 谢谢
我正在使用WerbLogic 10.3.5和Spring 3.0实现JMS队列。我有以下Spring配置: 我的消息创建代码如下所示: 我的听众是这样的: 消息被正确创建,侦听器的onMessage()方法被调用,但是如果逻辑失败,我抛出RuntimeException(),消息不会被重新传递。我尝试了上述代码的许多细微变化(例如设置SessionAcknowledgeMemodeName=SES
我将我们的应用程序从SpringBoot1.5迁移到SpringBoot2.2。我正在为我们的渐变身材而挣扎。在过去,我们使用以下方法获得一个包含所有Spring依赖项的可执行Jar文件,以分别在CI管道中运行集成测试: 集成Jar任务用我们的集成测试类和主类构建一个基本jar。在过去,我们能够通过重新打包任务“增强”这个罐子。但是在Spring Boot 2.2中,此任务不再存在。Spring文