当前位置: 首页 > 知识库问答 >
问题:

集成了Tomcat与ActiveMQ和Spring的DefaultMessageListenerContainer-Redeliver JMS msg

韦鸣
2023-03-14
 Jms.messageDrivenChannelAdapter(jmsConnectionFactory, TransactedMessageListenerContainer.class)
      .destination(destination)
      .errorChannel(errorChannel)
     .get();
public class TransactedMessageListenerContainer extends DefaultMessageListenerContainer {

     public TransactedMessageListenerContainer() {
         this.setSessionTransacted(true);
     } 
}

如果发生异常,ActiveMQ broker不会相应地重新传递消息。

当我使用org.apache.activemq.broker.brokerservice进行简单的集成测试时,JMS消息会被重新传递,即可以实现重试机制

如何用ActiveMQ为Tomcat实现同样的功能?

@Bean
public IntegrationFlow errorHandlingFlow() {
    return IntegrationFlows.from(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)
             .handle(this::errorMessageHandler)
             .get();
}

public void errorMessageHandler(Message<?> message) {
    log.warn("handling error message");
    log.warn("headers: " + message.getHeaders().toString());
    log.warn("payload: " + message.getPayload().toString());
    MessagingException exception = (MessagingException) message.getPayload();
    log.warn("original payload: " + exception.getFailedMessage().getPayload());
    throw exception; // make JMS broker redeliver
}

共有1个答案

史俊德
2023-03-14

如果希望进行回退和重新传递,错误流必须重新抛出异常,而不是像使用defaulterrorchannel那样吞咽异常。你可以在这里找到类似的问题和答案。

更新

嗯,我不知道你的问题在哪里,我有一个类似于你的测试用例:

    @Autowired
    private MessageChannel errorChannel;

    @Bean
    public IntegrationFlow jmsMessageDrivenFlow() {
        return IntegrationFlows
                .from(Jms.messageDrivenChannelAdapter(this.jmsConnectionFactory)
                        .configureListenerContainer(c -> c.sessionTransacted(true))
                        .errorChannel(this.errorChannel)
                        .destination("jmsMessageDriver"))
                .<String, String>transform(p -> {
                    throw new RuntimeException("intentional");
                })
                .get();
    }

    @Bean
    public IntegrationFlow errorHandlingFlow() {
        return IntegrationFlows.from(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)
                .handle(m -> {
                    MessagingException exception = (MessagingException) m.getPayload();
                    Message<?> failedMessage = exception.getFailedMessage();
                    throw exception;
                })
                .get();
    }
"jms_redelivered" -> "true"
"JMSXDeliveryCount" -> "2"
 类似资料:
  • 我正在尝试将我的应用程序与JMS队列集成(使用ActiveMQ)。我使用Spring集成作为集成组件。我们希望有连池。已将'maxContopt消费者'作为100提供给'DefaultMessageListenerContainer'。 问题是,一旦从队列中读取了所有消息,“消费者数量”仍为100(如ActiveMq控制台上所示)。当我们在数据库中使用连接池(通过JNDI)时,一旦不再需要连接,它

  • 在我们的应用程序中,我们使用Spring与ActiveMQ集成。我们面临一个问题,例如每当ActiveMQ服务器关闭时,应用程序都会抛出: 你能建议如何让我的应用程序在ActiveMQ关闭的情况下运行吗 请在下面找到ActiveMQ配置:

  • 本文向大家介绍如何集成 Spring Boot 和 ActiveMQ?相关面试题,主要包含被问及如何集成 Spring Boot 和 ActiveMQ?时的应答技巧和注意事项,需要的朋友参考一下 对于集成 Spring Boot 和 ActiveMQ,我们使用 spring-boot-starter-activemq 依赖关系。 它只需要很少的配置,并且不需要样板代码。

  • 我目前正在尝试编写一个适配器,它将使用来自ActiveMQ的消息并将其发布到Kafka。 我正在考虑使用Spring集成来集成这两个消息传递系统。 我的问题是,我的应用程序不会维护模型的注册表,许多应用程序将使用该注册表将记录发布到activeMQ。我想接收这些javax-jms消息,并想执行一些转换,比如将jmscorrelationId添加到kafka消息中。 另外,另一个要求是仅当kafka

  • 我的公司目前正在研究Thrift和ActiveMQ的集成。我们希望建立一个独立于语言的服务层,该服务层运行在单个http服务器上,每个thrift服务都能够通过ActiveMQ与其他thrift服务通信。到目前为止,我还没有找到任何其他人试图实施这一点。我很好奇其他人会如何实现这一点,以及是否有这样做的文档。 当前原型使用一个简单的python服务器来托管各种备用服务。在每个服务调用(即更新设置)

  • 我尝试使用Spring集成HTTP开发SpringBoot Rest服务器- 我有一个控制器,用“@Controller”和“@RequestMapping”注释,并尝试创建以下流: 获取请求"/"- 但它不起作用。 我的集成Xml: 错误是: 但在我看来,控制器应该是通过Request Map注释的订阅者... 我上传了一个示例github项目:https://github.com/marcel