当前位置: 首页 > 知识库问答 >
问题:

骆驼消息转发行为不如预期

子车雅珺
2023-03-14

我在Camel中有一个路由,当异常发生时,我想重试,但是我想设置一个属性,以便该路由在第二次尝试时可以做一些稍微不同的事情,以阻止重试中再次发生错误。这里有一个路线说明了我目前正在尝试的想法。

from("direct:onExceptionTest")
    .onException(Exception.class)
        .maximumRedeliveries(1)
        .log("Retrying")
        .setProperty("retrying", constant(true))
    .end()
    .log("Start")   
    .choice()
        .when(property("retrying").isNull())
            .log("Throwing")
            .throwException(new Exception("Hello world"))
        .end()
    .end()
    .log("Done")

显然这不是真正的路线;整个选项主体只是在某些情况下模拟我的组件错误。我希望看到以下消息被记录:

Start
Throwing
Retrying
Start
Done

但我真正看到的是:

Start
Throwing
Retrying
Failed delivery for (MessageId: ... on ExchangeId: ...). Exhausted after delivery attempt: 2 caught: java.lang.Exception: Hello world. Processed by failure processor: FatalFallbackErrorHandler[Pipeline[[Channel[Log(onExceptionTest)[Retrying]], Channel[setProperty(retrying, true)]]]]

我尝试将handled(true)添加到异常处理程序中,但这只会抑制错误消息。我没有看到第二条开始或完成日志消息

为什么我的路线没有按照我的预期运行,我需要做什么才能让它按照我想要的方式运行?

@程序员Dan指出,问题是重新传递不是为了我想要实现的目标,这可以解释为什么我的路由不起作用!所以我需要在我的处理程序中完成这项工作,但是我的路由调用了一个网络服务,还有一些其他步骤,我不想在处理程序中重复所有这些。我想出了这个方法,它和预期的一样起作用,但是它涉及到路由从一开始就再次调用自己。这是个坏主意吗?我会不会用这种方法让自己陷入困境?

from("direct:onExceptionTest")
    .onException(Exception.class)
        .onWhen(property("retrying").isNull()) // don't retry forever
        .log("Retrying")
        .setProperty("retrying", constant(true))
        .handled(true)
        .to("direct:onExceptionTest") // is recursion bad?
    .end()
    .log("Start")   
    .choice()
        .when(property("retrying").isNull())
            .log("Throwing")
            .throwException(new Exception("Hello world"))
        .end()
    .end()
    .log("Done")

共有2个答案

宗政功
2023-03-14

关于Camel的返还机制,有几点需要考虑。首先,查看关于这个主题的文档,这些文档可能会挑战你对Camel如何处理返还的假设。我提到的一点是,Camel尝试在失败点进行返还,它不会从路由的开始重新开始(就像你似乎假设的那样)。如果我正确理解文档(我有一段时间没有尝试过这种模式了),你基本上是告诉它重试几次抛出异常,我怀疑这是你想测试的。

第二,我建议直接在onException()处理器链中执行替代处理,正如在同一文档中进一步演示的那样。基本上,您可以指定希望通过自定义处理器处理消息的方式,并使用handled(true)stop()指示无需进一步处理。

总而言之,重新交付通常意味着处理典型的endpoint交付故障,如间歇性连接中断、接收服务器暂时不可用等。在这些情况下,最合理的做法是“再试一次”,并有一个合理的成功预期。如果需要更复杂的逻辑来处理重试,请在onException()处理器链中使用自定义处理器或一系列处理器。

尹弘壮
2023-03-14

使用onRe递送处理器来设置属性:

String KEY = "retrying";

from("direct:onExceptionTest")
     .onException(RuntimeException.class)
         .onRedelivery(new Processor() { // Sets a processor that should be processed before a redelivery attempt.
             @Override
             public void process(final Exchange exchange) throws Exception {
                 LOG.info("Retrying");
                 exchange.setProperty(KEY, true);
             }
        })
        .maximumRedeliveries(1)
        .handled(true)
    .end()
    .log("Start")
    .process(new Processor() {
        @Override
        public void process(final Exchange exchange) throws Exception {
            LOG.info("No problem");
        }
    })
    .process(new Processor() {
        @Override
        public void process(final Exchange exchange) throws Exception {
            if (exchange.getProperty(KEY) == null) {
                LOG.info("Throwing");
                throw new RuntimeException("Hello World");
            }
            else {
                LOG.info("No throwing");
            }
        }
    })
    .log("Done");

这张照片

[                          main] route1                         INFO  Start
[                          main] OnExceptionHandler             INFO  No problem
[                          main] OnExceptionHandler             INFO  Throwing
[                          main] OnExceptionHandler             INFO  Retrying
[                          main] OnExceptionHandler             INFO  No throwing
[                          main] route1                         INFO  Done

正如@程序员丹所指出的,只有失败的处理器会被重新执行,而不是第一个没有任何问题就通过的处理器。

编辑:

如果所有的处理都必须重新完成,那么您可以使用带有dotrydoCatch的子路由,如下所示:

from("direct:onExceptionTest")
    .doTry()
        .to("direct:subroute")
    .doCatch(RuntimeException.class)
        .setProperty(KEY, constant(true))
        .to("direct:subroute")
    .end()
    .log("Done");

from("direct:subroute")
    .log("Start")
    .process(new Processor() {
        @Override
        public void process(final Exchange exchange) throws Exception {
            LOG.info("No problem");
        }
    })
    .process(new Processor() {
        @Override
        public void process(final Exchange exchange) throws Exception {
            if (exchange.getProperty(KEY) == null) {
                LOG.info("Throwing");
                throw new RuntimeException("Hello World");
            }
            else {
                LOG.info("No throwing");
            }
        }
    });

从骆驼文档:

使用doTry时<代码>文件处理doFinally则常规的驼峰错误处理程序不适用。这意味着任何onException或类似事件都不会触发。原因是,doTry<代码>文件处理doFinally实际上是它自己的错误处理程序,它的目标是模仿并像Java中的try/catch/finally那样工作。

 类似资料:
  • 我有一个场景,我想“拉”RabbitMQ队列/主题的消息,并一次处理一个。特别是当消费者启动时,队列中已经有消息。我尝试了以下方法,但没有成功(这意味着,这些选项中的每一个都会读取队列,直到队列为空,或者直到另一个线程关闭上下文)。 1.第一次处理后立即停止路由 与1类似,但使用闩锁而不是while loop和sleep。 使用轮询消费者 使用ConsumerTemplate()-类似于上面的代码

  • 我在为 端口设置 消费者以捕获消息时遇到问题。我的: 申请开始: 并且<code>514</code>端口已打开但未侦听 我可以在tcpdump中看到,tcpdump-I eth1-nn-A-s 0端口514和udp正确发送和接收消息。 有人能告诉我我做错了什么吗?

  • 我正在尝试Apache Camel技术,但遇到了麻烦。 我使用Jpa组件的轮询消费者as from(),并将实体上的@consumed注释设置为在流程结束时将其状态更改为“已成功处理”。它很好用。 但如果出现任何错误,我不想在数据库中设置“已成功处理”标志,而是设置“错误发生”之类的标志。 为了解决这个问题,我创建了一个带有处理器的“死信队列”,该处理器试图更改我正在进行的实体的状态,以便稍后保存

  • 我有一个简单的驼峰路由,它接受一个项目列表,将它们拆分,将每个元素发送到mq节点进行处理,然后通过聚合器将它们连接在一起。 非常接近合成消息处理器:http://camel.apache.org/composed-message-processor.html 但是我们注意到拆分后,camel会创建多个并发消费者?或者交换?因为消息被发送给多个消费者,他们永远不会完成。 列表:1,2,3,4 拆分:

  • 我在JPA上遇到了以下问题,但这可能更像是一个关于骆驼的概念问题。 我需要一个基于cron的石英消费者。但如果触发了,我想选择JPA组件作为第一步。 但是如果我用“to”调用JPA组件,那么它被用作生产者,而不是消费者。我可以以某种方式使用JPA组件来处理这个问题吗,或者我必须遵循服务激活器(基于bean的)逻辑并将JPA组件留在后面? 提前谢谢你,葛格利

  • 我的主要问题是:在JPA实体类内的方法上,除了@Consumed注释之外,还有其他选择吗? 我问这个问题是因为我真的想把我的JPA模型(例如我的所有实体)具体化,以便能够在使用相同数据库的其他项目之间共享。此外,我不想在我的子项目中出现一些骆驼依赖,这意味着要删除@Consumed注释。我基本上有这样一条路线: 我尝试了不同的解决方案: 在我的camel项目中,使用@Consumed方法扩展子模块