当前位置: 首页 > 知识库问答 >
问题:

如何在成功处理camel内部重新传递后防止MQ Broker重新传递消息(transacted camel路由)

栾和玉
2023-03-14

我正试图用JPATransactionManager即spring PlatformTransactionManager运行一个camel transacted()路由(一个独立的java进程)(因为我希望camel路由在单个DB事务中运行),但我无法在事务方法失败的情况下禁止从MQ Broker进行重新传递,尽管我在onException子句中使用了handle(true)以及自定义的重新传递策略(已成功执行)。我只希望MQ在服务崩溃时重新交付。

在下面尝试过,但不起作用:

>

  • 在JMSComponent config中设置setTransacted(false)以防止camel jms运行为transacted_session jms模式,但它不起作用
  • 从事务块中删除和删除异常
  • 骆驼重新交付后跟已处理(true)。

    onException(Exception.class)
        .log("ERROR OCCURRED")
        .redeliveryPolicyRef("myRedeliveryPolicy")
        .handled(true)
        .to(getPostExceptionRoute());
    
    @Bean
    @Autowired
    public RedeliveryPolicy myRedeliveryPolicy() {
        RedeliveryPolicy myRedeliveryPolicy= new RedeliveryPolicy();
        myRedeliveryPolicy.setMaximumRedeliveries(2);
        myRedeliveryPolicy.setMaximumRedeliveryDelay(2000);
        return myRedeliveryPolicy;
    }
    
    @Bean
    @Autowired
    public JmsComponent jms(IJMSConnectionFactory cf) throws JMSException {
        JmsComponent jmsComponent = new JmsComponent();
        jmsComponent.setConfiguration(jmsConfig(cf));
        jmsComponent.setTransacted(false);
        return jmsComponent;
    }
    
    from("jms:queue:TestQueue?acknowledgementModeName=CLIENT_ACKNOWLEDGE")
        .unmarshal().json(JsonLibrary.Jackson, TestObject.class)
        .transacted()
        .processRef("myPersistInDBProcessor")
    

    我希望camel会按照重新交付策略(工作)尝试重新交付,但MQ不应该重新交付。

  • 共有1个答案

    衡泰
    2023-03-14

    我希望camel按照重新交付策略(工作)尝试重新交付,但MQ不应该重新交付

    当MQ绝对不能执行重新传递时(因为您在Camel中处理错误),您应该删除acknowledgementModeName=client_acknowledge或显式设置auto_acknowledge(默认值)。

    从代理的角度来看,只要消息没有被确认,它就没有被传递。auto_acknowledge在使用后立即确认消息,如果您永远不希望得到重新交付,这是有意义的。

    client_acknowledge则只在特定条件下确认消息,有关此方面的更多信息,请参阅本文。

    如果您希望MQ重新交付,但在大多数情况下使用Camel“重写”它们,则必须使用已处理的消息。

    jmsComponent.setLazyCreateTransactionManager(false);
    jmsComponent.setTransacted(true);
    

    对于这种类型的事务,您根本不需要Spring TransactionManager。所以我猜JPatransactionManager被JMS忽略了,您的JMS消费应该是事务性的。

    现在,当您的骆驼错误处理程序通过使用handled(true)“吞噬”一个异常时,必须没有MQ重新传递。但当异常传播回代理时,MQ会执行重新传递。

    我希望我的骆驼路由在单个db事务中运行

     类似资料:
    • 我正在读一条来自Solace的信息。我能够成功地阅读信息。假设我正在阅读一条消息,在侦听器线程上读取/处理消息时,应用程序崩溃。那我怎么能在那上面再读一遍那条信息呢。使用下面的代码,我无法再次阅读该消息。下面是我的配置

    • 我读到:http://www.javaworld.com/article/2074123/java-web-development/transaction-and-redelivery-in-jms.html?page=2 "通常,确认特定消息会确认会话接收的所有先前消息"(在客户端确认模式下) “邮件重新传递不是自动的,但在某些情况下会重新传递邮件” 我的问题是: 如何确保每次收到消息时都有一个

    • 高层体系结构 JMS(生产者/消费者)<---->Artemis(STOMP)<---->Websocket-Broker-Relay-Service<---->STOMP-over-Websocket-client(生产者/消费者)

    • 我正在使用apacheMQ作为队列管理器。我使用Spring的DefaultMessageListenerContainer来使用消息。我已经对它进行了配置,以便它有一个事务:

    • 我将不折不扣地学习以下教程:https://www.rabbitmq.com/tutorials/tutorial-two-java.html。 我以这样的方式启动RabbitMQ服务器: 我生成了两个消费者,当我Ctrl+C其中一个时,另一个正在运行的消费者不会接收到最初发往前一个消费者的消息。如何在Ctrl+C'ing从一个消费者中重新传递消息? 编辑:我现在正在通过'brew'安装Rabbi

    • 有以下骆驼路线。 重新交付策略在xml中定义如下- 但是,当抛出异常时,会在执行OnException块之前进行重新传递尝试(一些配置属性会在onException块中更新。在OnException内部的DiscoveryService中有一个调试点,它会在进行重新传递尝试后被调用)。因此,当前消息在没有重新传递的情况下丢失。不确定为什么会发生这种情况。使用actiemq-camel版本5.8.0