我们在Camel中定义了一个具有拆分和聚合功能的路由,但无法在聚合器之后将异常传播回拆分。这导致即使我们遇到异常,拆分也会运行
下面是不工作的代码
from("direct:MyRoute")
.routeId("MyRouteID")
.split().tokenize("\n", 1)
.streaming().stopOnException()
.choice()
.when(simple("${property.CamelSplitIndex} > 0"))
.unmarshal(domainDataFormat)
.choice()
.when(simple("${property.CamelSplitComplete}"))
.process(
new Processor()
{
@Override
public void process(Exchange exchange) throws Exception
{
exchange.getIn().getHeaders().put(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS_INCLUSIVE, true);
}
}
)
.end()
.aggregate(myAggregationStrategy).constant(true) //if i comment this line split will be stop on exception
.threads().executorService(executorService)
.process(myProcessor).end()
.end();
上述代码中的处理器(myProcessor)如下:
counter.incrementAndGet(); //atomic counter
if(counter.get()==3)
{
exchange.setException(new RuntimeException());
throw new RuntimeCamelException();
}
但是,当我从路由中移除聚合时,Split能够在异常情况下停止路由。
使用复合消息处理器EIP,它是在同一工作单元中拆分聚合(分叉/连接)。
参见文档:http://camel.apache.org/composed-message-processor.html
请参阅“仅拆分器”部分,在该部分中,您可以为拆分器指定聚合策略,使其协同工作。
我想分割交换消息体(它是MyCustomClass对象的列表),处理它们(一个接一个),然后将所有的交换聚合在一起。拆分可以,一个一个处理也可以,但是我想不出怎么把它们聚合起来。 我不需要复杂的聚合,只需要收集分离的交换列表,并在最终的处理器中处理它们。
我试图在聚合器完成后获得一个回复,但是我得到一个异常,我没有指定任何聚合器子项,但是当我指定一个。to()endpoint我没有收到聚合结果。。。这可能吗? 控制器: 聚合器:
目前,我正在与spring integration合作开发新的应用程序,并启动了poc,以了解如何处理故障案例。在我的应用程序中,spring integration将接收来自IBM mq的消息,并根据消息类型验证头信息和到不同队列的路由。传入的消息可能是批量消息,所以我使用了spring integration的拆分器和聚合器,并且对技术工作流程有很好的进展和控制。目前我面临的问题很少,我们有I
我试图在使用RESTendpoint的骆驼路由中构建一个分割/聚合模式。它需要一个包含请求详细信息列表的请求对象。我想并行处理请求详细信息,然后将聚合结果返回给调用方。我希望这是一个同步调用。 这是我的路线中的代码。 我希望调用的结果是聚合调用(我的响应对象)的输出。但我实际上得到的是REST调用返回的请求对象?? 当我放入更多的日志语句时,我可以看到Split调用正在触发多个线程,这很好。我可以
我的模式实现受到了mongo官方网站上这篇教程的影响 这基本上是为时间序列数据设计的模式,我将每个设备每小时的数据存储在单个文档中的数组中。我创建字段组合发送数据和时间的设备id。例如,如果id为的设备在发送数据,则我的 字段将变为
我从基于apache-camel-spark的rest接口获得一个json数组作为输入。开始时,我想通过apache camels路线分割json-array来处理每个元素。我该怎么做? 我的测试输入json: 对于这个问题,我在stackoverflow上找到了一些间接描述的问题: link 1, link 2, link 3。 根据这些示例,我尝试了以下骆驼路线: 当我这样做时,我总是得到以下