我想分割交换消息体(它是MyCustomClass对象的列表),处理它们(一个接一个),然后将所有的交换聚合在一起。拆分可以,一个一个处理也可以,但是我想不出怎么把它们聚合起来。
from("mysource")
.unmarshal(new ListJacksonDataFormat(MyClass.class))
.split().body()
.process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
// process MyClass item
exchange.getIn().setBody(processedItem);
}
})
.to("destinationForProcessedItem")
.aggregate(new GroupedExchangeAggregationStrategy()) <== Seems like problem is here
.process(new Processor() {
// handle result of aggregation
})
我不需要复杂的聚合,只需要收集分离的交换列表,并在最终的处理器中处理它们。
这样写
.aggregate(new AggregationStrategy() {
@Override
public Exchange aggregate(Exchange exchange, Exchange exchange1) {
//logic for aggregation using exchnage and exchange1
}
})
使用拆分器中的内置聚合器,请参阅组合消息处理器EIP模式:https://camel.apache.org/components/latest/eips/composed-message-processor.html#_sample.
我从基于apache-camel-spark的rest接口获得一个json数组作为输入。开始时,我想通过apache camels路线分割json-array来处理每个元素。我该怎么做? 我的测试输入json: 对于这个问题,我在stackoverflow上找到了一些间接描述的问题: link 1, link 2, link 3。 根据这些示例,我尝试了以下骆驼路线: 当我这样做时,我总是得到以下
我试图在聚合器完成后获得一个回复,但是我得到一个异常,我没有指定任何聚合器子项,但是当我指定一个。to()endpoint我没有收到聚合结果。。。这可能吗? 控制器: 聚合器:
使用Spring Integr中的拆分器,我拆分了从数据库中的表中选择的数据行。每条消息完成处理后,我想像旧消息一样将每条消息聚合到一条消息中。我该怎么办?我不知道拆分器拆分了多少条消息。我只知道拆分消息头中的相关ID。即使我聚合消息,我也无法制定发布策略。 我如何解决这个问题? 以及是否有任何方法可以使用jdbc-out站网关或jdbc-out站通道适配器一次插入多行数据,而无需使用拆分器插入每
目前,我正在与spring integration合作开发新的应用程序,并启动了poc,以了解如何处理故障案例。在我的应用程序中,spring integration将接收来自IBM mq的消息,并根据消息类型验证头信息和到不同队列的路由。传入的消息可能是批量消息,所以我使用了spring integration的拆分器和聚合器,并且对技术工作流程有很好的进展和控制。目前我面临的问题很少,我们有I
我的模式实现受到了mongo官方网站上这篇教程的影响 这基本上是为时间序列数据设计的模式,我将每个设备每小时的数据存储在单个文档中的数组中。我创建字段组合发送数据和时间的设备id。例如,如果id为的设备在发送数据,则我的 字段将变为
我们在Camel中定义了一个具有拆分和聚合功能的路由,但无法在聚合器之后将异常传播回拆分。这导致即使我们遇到异常,拆分也会运行 下面是不工作的代码 上述代码中的处理器(myProcessor)如下: 但是,当我从路由中移除聚合时,Split能够在异常情况下停止路由。