我在Camel中有一个路由,当异常发生时,我想重试,但是我想设置一个属性,以便该路由在第二次尝试时可以做一些稍微不同的事情,以阻止重试中再次发生错误。这里有一个路线说明了我目前正在尝试的想法。
from("direct:onExceptionTest")
.onException(Exception.class)
.maximumRedeliveries(1)
.log("Retrying")
.setProperty("retrying", constant(true))
.end()
.log("Start")
.choice()
.when(property("retrying").isNull())
.log("Throwing")
.throwException(new Exception("Hello world"))
.end()
.end()
.log("Done")
显然这不是真正的路线;整个选项
主体只是在某些情况下模拟我的组件错误。我希望看到以下消息被记录:
Start
Throwing
Retrying
Start
Done
但我真正看到的是:
Start
Throwing
Retrying
Failed delivery for (MessageId: ... on ExchangeId: ...). Exhausted after delivery attempt: 2 caught: java.lang.Exception: Hello world. Processed by failure processor: FatalFallbackErrorHandler[Pipeline[[Channel[Log(onExceptionTest)[Retrying]], Channel[setProperty(retrying, true)]]]]
我尝试将handled(true)
添加到异常处理程序中,但这只会抑制错误消息。我没有看到第二条开始或完成日志消息。
为什么我的路线没有按照我的预期运行,我需要做什么才能让它按照我想要的方式运行?
@程序员Dan指出,问题是重新传递不是为了我想要实现的目标,这可以解释为什么我的路由不起作用!所以我需要在我的处理程序中完成这项工作,但是我的路由调用了一个网络服务,还有一些其他步骤,我不想在处理程序中重复所有这些。我想出了这个方法,它和预期的一样起作用,但是它涉及到路由从一开始就再次调用自己。这是个坏主意吗?我会不会用这种方法让自己陷入困境?
from("direct:onExceptionTest")
.onException(Exception.class)
.onWhen(property("retrying").isNull()) // don't retry forever
.log("Retrying")
.setProperty("retrying", constant(true))
.handled(true)
.to("direct:onExceptionTest") // is recursion bad?
.end()
.log("Start")
.choice()
.when(property("retrying").isNull())
.log("Throwing")
.throwException(new Exception("Hello world"))
.end()
.end()
.log("Done")
关于Camel的返还机制,有几点需要考虑。首先,查看关于这个主题的文档,这些文档可能会挑战你对Camel如何处理返还的假设。我提到的一点是,Camel尝试在失败点进行返还,它不会从路由的开始重新开始(就像你似乎假设的那样)。如果我正确理解文档(我有一段时间没有尝试过这种模式了),你基本上是告诉它重试几次抛出异常,我怀疑这是你想测试的。
第二,我建议直接在onException()
处理器链中执行替代处理,正如在同一文档中进一步演示的那样。基本上,您可以指定希望通过自定义处理器处理消息的方式,并使用handled(true)
和stop()
指示无需进一步处理。
总而言之,重新交付通常意味着处理典型的endpoint交付故障,如间歇性连接中断、接收服务器暂时不可用等。在这些情况下,最合理的做法是“再试一次”,并有一个合理的成功预期。如果需要更复杂的逻辑来处理重试,请在onException()
处理器链中使用自定义处理器或一系列处理器。
使用onRe递送
和处理器
来设置属性:
String KEY = "retrying";
from("direct:onExceptionTest")
.onException(RuntimeException.class)
.onRedelivery(new Processor() { // Sets a processor that should be processed before a redelivery attempt.
@Override
public void process(final Exchange exchange) throws Exception {
LOG.info("Retrying");
exchange.setProperty(KEY, true);
}
})
.maximumRedeliveries(1)
.handled(true)
.end()
.log("Start")
.process(new Processor() {
@Override
public void process(final Exchange exchange) throws Exception {
LOG.info("No problem");
}
})
.process(new Processor() {
@Override
public void process(final Exchange exchange) throws Exception {
if (exchange.getProperty(KEY) == null) {
LOG.info("Throwing");
throw new RuntimeException("Hello World");
}
else {
LOG.info("No throwing");
}
}
})
.log("Done");
这张照片
[ main] route1 INFO Start
[ main] OnExceptionHandler INFO No problem
[ main] OnExceptionHandler INFO Throwing
[ main] OnExceptionHandler INFO Retrying
[ main] OnExceptionHandler INFO No throwing
[ main] route1 INFO Done
正如@程序员丹所指出的,只有失败的处理器会被重新执行,而不是第一个没有任何问题就通过的处理器。
编辑:
如果所有的处理都必须重新完成,那么您可以使用带有dotry
和doCatch
的子路由,如下所示:
from("direct:onExceptionTest")
.doTry()
.to("direct:subroute")
.doCatch(RuntimeException.class)
.setProperty(KEY, constant(true))
.to("direct:subroute")
.end()
.log("Done");
from("direct:subroute")
.log("Start")
.process(new Processor() {
@Override
public void process(final Exchange exchange) throws Exception {
LOG.info("No problem");
}
})
.process(new Processor() {
@Override
public void process(final Exchange exchange) throws Exception {
if (exchange.getProperty(KEY) == null) {
LOG.info("Throwing");
throw new RuntimeException("Hello World");
}
else {
LOG.info("No throwing");
}
}
});
从骆驼文档:
使用doTry
时<代码>文件处理doFinally
则常规的驼峰错误处理程序不适用。这意味着任何onException
或类似事件都不会触发。原因是,doTry
<代码>文件处理doFinally
实际上是它自己的错误处理程序,它的目标是模仿并像Java中的try/catch/finally那样工作。
我有一个场景,我想“拉”RabbitMQ队列/主题的消息,并一次处理一个。特别是当消费者启动时,队列中已经有消息。我尝试了以下方法,但没有成功(这意味着,这些选项中的每一个都会读取队列,直到队列为空,或者直到另一个线程关闭上下文)。 1.第一次处理后立即停止路由 与1类似,但使用闩锁而不是while loop和sleep。 使用轮询消费者 使用ConsumerTemplate()-类似于上面的代码
我在为 端口设置 消费者以捕获消息时遇到问题。我的: 申请开始: 并且<code>514</code>端口已打开但未侦听 我可以在tcpdump中看到,tcpdump-I eth1-nn-A-s 0端口514和udp正确发送和接收消息。 有人能告诉我我做错了什么吗?
我正在尝试Apache Camel技术,但遇到了麻烦。 我使用Jpa组件的轮询消费者as from(),并将实体上的@consumed注释设置为在流程结束时将其状态更改为“已成功处理”。它很好用。 但如果出现任何错误,我不想在数据库中设置“已成功处理”标志,而是设置“错误发生”之类的标志。 为了解决这个问题,我创建了一个带有处理器的“死信队列”,该处理器试图更改我正在进行的实体的状态,以便稍后保存
我有一个简单的驼峰路由,它接受一个项目列表,将它们拆分,将每个元素发送到mq节点进行处理,然后通过聚合器将它们连接在一起。 非常接近合成消息处理器:http://camel.apache.org/composed-message-processor.html 但是我们注意到拆分后,camel会创建多个并发消费者?或者交换?因为消息被发送给多个消费者,他们永远不会完成。 列表:1,2,3,4 拆分:
我在JPA上遇到了以下问题,但这可能更像是一个关于骆驼的概念问题。 我需要一个基于cron的石英消费者。但如果触发了,我想选择JPA组件作为第一步。 但是如果我用“to”调用JPA组件,那么它被用作生产者,而不是消费者。我可以以某种方式使用JPA组件来处理这个问题吗,或者我必须遵循服务激活器(基于bean的)逻辑并将JPA组件留在后面? 提前谢谢你,葛格利
我的主要问题是:在JPA实体类内的方法上,除了@Consumed注释之外,还有其他选择吗? 我问这个问题是因为我真的想把我的JPA模型(例如我的所有实体)具体化,以便能够在使用相同数据库的其他项目之间共享。此外,我不想在我的子项目中出现一些骆驼依赖,这意味着要删除@Consumed注释。我基本上有这样一条路线: 我尝试了不同的解决方案: 在我的camel项目中,使用@Consumed方法扩展子模块