我们有一些使用方路径(ftp,file,smb)从远程系统读取文件。简化了直接路径测试,但与批处理使用者的行为类似:
from("direct:"+routeId).id(routeId)
.setProperty(AGGREGATION_PROPERTY, constant(routeId))
.log(String.format("Sending (${body}) to %s", "direct:start1"))
.to("direct:aggregate");
转换后,一次轮询的所有结果将在单独的路径中按批汇总:
from("direct:aggregate")
.aggregate(property(AGGREGATION_PROPERTY), new BodyInAggregatingStrategy())
.completionFromBatchConsumer()
.to("log:result", "mock:result");
如果每个消费者分开运行,则一切正常。但是,如果多个使用者并行运行,聚合将拆分民意测验。例如,如果文件消费者轮询500条消息,并且第二条路线开始从ftp读取6个文件,则期望得到2个聚合1,其中包含来自文件的500条消息,以及1个聚合,其中6条来自ftp的消息。
测试用例:
public void testAggregateByProperty() throws Exception {
MockEndpoint result = getMockEndpoint("mock:result");
result.expectedBodiesReceived("A+A+A", "B+B", "A", "Z");
template.sendBodyAndProperty("direct:A", "A", Exchange.BATCH_SIZE, 3);
template.sendBodyAndProperty("direct:A", "A", Exchange.BATCH_SIZE, 3);
template.sendBodyAndProperty("direct:B", "B", Exchange.BATCH_SIZE, 2);
template.sendBodyAndProperty("direct:A", "A", Exchange.BATCH_SIZE, 3);
template.sendBodyAndProperty("direct:B", "B", Exchange.BATCH_SIZE, 2);
template.sendBodyAndProperty("direct:A", "A", Exchange.BATCH_SIZE, 1);
template.sendBodyAndProperty("direct:Z", "Z", Exchange.BATCH_SIZE, 7);
assertMockEndpointsSatisfied();
}
结果是:“ A + A”,“ B”,“ A”,“ B”,“ A”,而不是预期的“ A + A + A”,“ B + B”,“ A”,“ Z” 。问题:
您几乎可以正常工作了。这是您需要的更改(稍后我会解释)。
from("direct:aggregate").id("aggregate")
.aggregate(property(AGGREGATION_PROPERTY), new BodyInAggregationStrategy())
.completionSize(property(Exchange.BATCH_SIZE))
.to("log:result", "mock:result")
结果将是:
Exchange received, body: A+A+A
Exchange received, body: B+B
Exchange received, body: A
注意:"Z"
由于批次大小为,您将不会收到结果7
。
解释一下-如您所读, Aggregator 是一个通用的骆驼组件,正确定义的关键是:
现在,在你的情况你是聚合上的属性AGGREGATION_PROPERTY
,这将是A
,B
或Z
。另外,您要指定批次大小。
但是,您并未completionSize()
在路线中表达。取而代之的是,您使用了completionFromBatchConsumer
-做一些不同的事情(代码指出它在寻找Exchange#BATCH_COMPLETE
属性),因此结果很奇怪。
无论如何,.completionSize(Exchange.BATCH_SIZE)
将使您的测试按需运行。
祝你好运。
我正试图弄清楚如何使用Spring Batch进行聚合。例如,我有一个带有姓名列表的CSV文件: 我想要文本文件中的姓名计数: 根据我从Spring Batch中学到的,ETL批处理过程(itemReader- Spring Batch是正确的工具吗?还是我应该用Spark?谢谢
我使用的是spring-kafka“2.1.7.Release”,我试图理解max.poll.interval.ms是如何将AckMode作为批处理而将enable.auto.commit作为“false”工作的。这里是我的消费者设置。 这是我的出厂设置 这是我的消费者,我增加了2分钟的延迟 现在,我发布了5条消息,并观察到它处理了所有记录,没有任何问题。但是,如果我将AckMode设置为记录它在
我正在使用spring批处理,我需要实现以下内容 读取包含日期和金额等详细信息的csv文件 将同一日期的所有金额的总和合计 保留一个带有日期和总和的条目 我在过去使用过批处理,我想到了下面的方法。用2个步骤创建批处理。 步骤1: 读取器:使用FlatFileItemReader遍历整个文件 处理器:用键作为日期,值作为金额填充映射。如果存在条目,则获取该值并将其添加到新值 编写器:没有操作编写器,
我试图在运行时使用一个特性聚合路由,到目前为止 显然,上面的代码不起作用,因为项目列表不能传递给。 所以我的问题是,如何将解析为接受,或者如何从的子类动态加载路由?
我想对3条路由使用Apache Camel并行组播,聚合(并等待)其中的2条路由,而让第3条路由自行进行(第3条路由不应阻塞前两条路由)。我还需要在“所有”情况下处理这两个,这意味着如果其中一个失败(例如在处理过程中抛出异常),也应该对其进行聚合。 根据我从Camel文档中了解到的情况,只要不指定StoponException,该行为就应该是“默认的”。但发生的情况是exchange异常永远不会到
我设置了一个HTTPS代理,这样HTTP客户端就可以安全地向代理发送普通HTTP请求。例如,客户机可以向代理发送加密的HTTP GET请求,代理将删除加密并将普通HTTP GET请求发送到终端站点。 我了解到这不是一个常见的设置,只有GoogleChrome具有内置功能来支持这样的场景。(信息在这里-http://wiki.squid-cache.org/Features/HTTPS#Encryp