我们在Camel中定义了一个路由,并且必须找出是否在处理器中抛出了异常。当我们只有一个处理器时,Camel会在sendbody()方法中重新抛出异常。如果有前面的拆分/聚合,则不会抛出异常。因此下面示例的结果是
before throwing Exception
after sendBody
如果我省略所有从。拆分到。完成大小(1)输出是
before throwing Exception
Exception thrown
任何想法如何找出,如果一个异常发生后分裂?
private static final String DIRECT_START = "direct:start";
public static void main(String[] args) throws Exception {
CamelContext context = new DefaultCamelContext();
context.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
from(DIRECT_START)
.split(body())
.aggregate(constant(true), new AggregationStrategy() {
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
return oldExchange == null ? newExchange : oldExchange;
}
})
.completionSize(1)
.process(new Processor() {
public void process(Exchange exchange) throws Exception {
System.out.println("before throwing Exception");
exchange.setException(new Exception());
throw new Exception("my Exception");
}
});
}});
context.start();
ProducerTemplate producer = context.createProducerTemplate();
try {
producer.sendBody(DIRECT_START, Integer.valueOf(42));
System.out.println("after sendBody");
} catch (Exception e) {
System.out.println("Exception thrown");
}
context.stop();
}
为了在之后检查异常,我们找到了一个解决方案。我们向onException()注册了一个ErrorProcess,它将状态设置为上下文属性。
但这不会中断producer.sendBody(…)。我们有非常长时间运行的处理器,我们必须中断。
所以问题是,我们可以配置Camel在sendbody中抛出异常,还是可以在Exceptionhandler中执行此操作?
我强烈建议在Camel in Action(第8.3.5节)中有一章介绍拆分器EIP和异常处理。本节说明:
将自定义AggregationStrategy与Splitter一起使用时,重要的是要知道您负责处理异常。如果您不将异常传播回来,Splitter将假定您已经处理了异常,并忽略它。
您使用了拆分()
方法,但没有指定聚合器。在Camel留档中,他们指定
默认情况下,拆分器将返回原始输入消息
这意味着离开拆分()
方法的交换没有异常,因此不会将异常传播回您的调用代码。从技术上讲,您从处理器抛出的异常在拆分器内部。即使您使用了聚合器,它也与拆分
调用无关,并且您还没有用end()
显式结束拆分
。因此,当您的处理器抛出异常时,拆分器会忽略它,因为您没有提供聚合器来处理和传播异常。
我们可以通过将聚合策略作为参数传递给拆分
调用来测试这一点,如下所示:
.split(body(), new AggregationStrategy() {
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
System.out.println("Aggregating");
return oldExchange == null ? newExchange : oldExchange;
}
})
.log("test") // inside the split/aggregator EIP
.end() // outside the split/aggregator EIP
.process(new Processor() {
public void process(Exchange exchange) throws Exception {
System.out.println("before throwing Exception");
throw new Exception("my Exception");
}
});
您将获得输出:
test
Aggregating
before throwing Exception
Exception thrown
如果您希望处理器位于拆分/聚合器EIP中,如下所示:
.split(body(), new AggregationStrategy() {
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
System.out.println("Aggregating");
return oldExchange == null ? newExchange : oldExchange;
}
})
.process(new Processor() { // inside the split/aggregator EIP
public void process(Exchange exchange) throws Exception {
System.out.println("before throwing Exception");
throw new Exception("my Exception");
}
})
.end(); // outside the split/aggregator EIP
您将获得输出:
before throwing Exception
Aggregating
Exception thrown
注意,在拆分/聚合器EIP中,聚合器是如何在引发异常后运行的?这很重要,因为如果没有聚合器传递异常,拆分器将忽略它。为了使其工作,您需要在聚合器中正确传播异常。例如,在您的代码中,如果newExchange包含一个异常,那么它将被忽略,因为您没有传播它。您需要更改聚合器以添加:
if (newExchange.getException() != null) {
oldExchange.setException(newExchange.getException());
}
注意:如果您在拆分的EIP中有一个调用,并且您将异常设置为正在处理,那么当您调用getException()时,它将不再返回。因此,如果您想处理异常,但仍然通过聚合器传播它们,可以使用exchange.getProperty(exchange.EXCEPTION\u Capture)
您也可以使用。stopOneException()
,如下所示:
.split(body()).stopOnException()
.process(new Processor() { // inside the split/aggregator EIP
public void process(Exchange exchange) throws Exception {
System.out.println("before throwing Exception");
throw new Exception("my Exception");
}
});
这会导致拆分在出现异常时停止,并传播它。但是,当您将聚合器放置在stopOneException()之后时,它将不再工作。我不完全清楚为什么。我猜这是因为聚合器改变了交换对象。
另外请注意,您不需要将处理器中的异常设置为exchange。当处理器抛出异常时,Camel将为您执行此操作。因此,线<代码>交换。setException(new Exception()) 不是必需的。
tl;是的,您可以从分割内部将异常传播到调用方法。您只需要确保它是通过与拆分相关联的聚合器完成的,或者设置stopOnException()。这取决于您试图通过拆分/聚合/处理实现什么,最好的方法是什么。
我正在将CSV文件输入集成到一个更大的系统中,我想对其进行设置,以便将CSV中未正确解析的各行(由)发送到医院队列,并将所有其他工作行转换为一个对象并聚合到一个列表中。但是,我很难将
我正在尝试从目录中选取一个文件,拆分一个文件,并将每一个拆分行添加到ActiveMQ中。我在这个过程中遇到了异常处理的问题。假设目录中的文件是一个二进制文件(可执行文件),那么splitter会显示org.apache.camel.runtimeCamelException和java.nio.charset.MalFormedInputException异常。如果出现这种情况,那么我需要捕获这些异
我试图使用Kerberos从ActiveMQ-Camel桥向Kafka (Cloudera)发送消息。 ActiveMQ v5.15.4 骆驼:2.21.1 Kafka客户端:1.1.0 服务器版本:Apache/2.4.6(CentOS) 骆驼.xml剪刀是: 这是日志中的kafka客户端配置: 日志级别:调试 Jaas文件: 出口: 当我发送消息时,我在调试级别收到以下日志,但消息未被发送:
在camel spring ws中,有没有办法将自定义主体(例如自定义bean)和http状态返回为500? 我试过了 然后在processor公共类ExceptionProcessor实现processor{ } 但是即使身体是我想要的,超文本传输协议状态总是200。 你能帮我吗? 更多信息:我正在使用camel 2.20.2版
无法弄清楚这是怎么回事-试图设置一个路由,只看到cxf连接到soap web服务(我不关心实际数据,也不期望数据实际“工作”,但它不断引发一个我不理解的异常: 我想知道我是否正确配置了它。我想这可能是一个丢失的jar,但当我尝试引入其他jar时,导致了依赖冲突 我正在使用maven依赖项“camel-cxf”来加载我所有的jar配置 “原因:org.apache.cxf.bus.extension
希望你有时间回答我的问题。在过去的几天里,我一直在阅读关于Camel的文章,并设法将一切都设置好并运行起来。现在,我碰到了一个棘手的部分:)。基本上,我在运行时使用Java定义一个路由,并将路由配置放在DB中。路线是可行的,信息从一边流向另一边。但是,当异常发生时,我想知道异常发生在哪里(在哪个路由endpoint之间),在DB中存储当前的交换主体(对进一步处理有用的进行中的消息),由用户更新消息