Jms.messageDrivenChannelAdapter(jmsConnectionFactory, TransactedMessageListenerContainer.class)
.destination(destination)
.errorChannel(errorChannel)
.get();
public class TransactedMessageListenerContainer extends DefaultMessageListenerContainer {
public TransactedMessageListenerContainer() {
this.setSessionTransacted(true);
}
}
如果发生异常,ActiveMQ broker不会相应地重新传递消息。
当我使用org.apache.activemq.broker.brokerservice
进行简单的集成测试时,JMS消息会被重新传递,即可以实现重试机制
如何用ActiveMQ为Tomcat实现同样的功能?
@Bean
public IntegrationFlow errorHandlingFlow() {
return IntegrationFlows.from(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)
.handle(this::errorMessageHandler)
.get();
}
public void errorMessageHandler(Message<?> message) {
log.warn("handling error message");
log.warn("headers: " + message.getHeaders().toString());
log.warn("payload: " + message.getPayload().toString());
MessagingException exception = (MessagingException) message.getPayload();
log.warn("original payload: " + exception.getFailedMessage().getPayload());
throw exception; // make JMS broker redeliver
}
如果希望进行回退和重新传递,错误流必须重新抛出异常,而不是像使用defaulterrorchannel
那样吞咽异常。你可以在这里找到类似的问题和答案。
更新
嗯,我不知道你的问题在哪里,我有一个类似于你的测试用例:
@Autowired
private MessageChannel errorChannel;
@Bean
public IntegrationFlow jmsMessageDrivenFlow() {
return IntegrationFlows
.from(Jms.messageDrivenChannelAdapter(this.jmsConnectionFactory)
.configureListenerContainer(c -> c.sessionTransacted(true))
.errorChannel(this.errorChannel)
.destination("jmsMessageDriver"))
.<String, String>transform(p -> {
throw new RuntimeException("intentional");
})
.get();
}
@Bean
public IntegrationFlow errorHandlingFlow() {
return IntegrationFlows.from(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)
.handle(m -> {
MessagingException exception = (MessagingException) m.getPayload();
Message<?> failedMessage = exception.getFailedMessage();
throw exception;
})
.get();
}
"jms_redelivered" -> "true"
"JMSXDeliveryCount" -> "2"
我正在尝试将我的应用程序与JMS队列集成(使用ActiveMQ)。我使用Spring集成作为集成组件。我们希望有连池。已将'maxContopt消费者'作为100提供给'DefaultMessageListenerContainer'。 问题是,一旦从队列中读取了所有消息,“消费者数量”仍为100(如ActiveMq控制台上所示)。当我们在数据库中使用连接池(通过JNDI)时,一旦不再需要连接,它
在我们的应用程序中,我们使用Spring与ActiveMQ集成。我们面临一个问题,例如每当ActiveMQ服务器关闭时,应用程序都会抛出: 你能建议如何让我的应用程序在ActiveMQ关闭的情况下运行吗 请在下面找到ActiveMQ配置:
本文向大家介绍如何集成 Spring Boot 和 ActiveMQ?相关面试题,主要包含被问及如何集成 Spring Boot 和 ActiveMQ?时的应答技巧和注意事项,需要的朋友参考一下 对于集成 Spring Boot 和 ActiveMQ,我们使用 spring-boot-starter-activemq 依赖关系。 它只需要很少的配置,并且不需要样板代码。
我目前正在尝试编写一个适配器,它将使用来自ActiveMQ的消息并将其发布到Kafka。 我正在考虑使用Spring集成来集成这两个消息传递系统。 我的问题是,我的应用程序不会维护模型的注册表,许多应用程序将使用该注册表将记录发布到activeMQ。我想接收这些javax-jms消息,并想执行一些转换,比如将jmscorrelationId添加到kafka消息中。 另外,另一个要求是仅当kafka
我的公司目前正在研究Thrift和ActiveMQ的集成。我们希望建立一个独立于语言的服务层,该服务层运行在单个http服务器上,每个thrift服务都能够通过ActiveMQ与其他thrift服务通信。到目前为止,我还没有找到任何其他人试图实施这一点。我很好奇其他人会如何实现这一点,以及是否有这样做的文档。 当前原型使用一个简单的python服务器来托管各种备用服务。在每个服务调用(即更新设置)
我尝试使用Spring集成HTTP开发SpringBoot Rest服务器- 我有一个控制器,用“@Controller”和“@RequestMapping”注释,并尝试创建以下流: 获取请求"/"- 但它不起作用。 我的集成Xml: 错误是: 但在我看来,控制器应该是通过Request Map注释的订阅者... 我上传了一个示例github项目:https://github.com/marcel