当前位置: 首页 > 面试题库 >

Camel 2.11批处理聚合如何与单独的路由一起工作?

樊琦
2023-03-14
问题内容

我们有一些使用方路径(ftp,file,smb)从远程系统读取文件。简化了直接路径测试,但与批处理使用者的行为类似:

from("direct:"+routeId).id(routeId)
 .setProperty(AGGREGATION_PROPERTY, constant(routeId))
 .log(String.format("Sending (${body}) to %s", "direct:start1"))
 .to("direct:aggregate");

转换后,一次轮询的所有结果将在单独的路径中按批汇总:

from("direct:aggregate")
  .aggregate(property(AGGREGATION_PROPERTY), new BodyInAggregatingStrategy())
  .completionFromBatchConsumer()
  .to("log:result", "mock:result");

如果每个消费者分开运行,则一切正常。但是,如果多个使用者并行运行,聚合将拆分民意测验。例如,如果文件消费者轮询500条消息,并且第二条路线开始从ftp读取6个文件,则期望得到2个聚合1,其中包含来自文件的500条消息,以及1个聚合,其中6条来自ftp的消息。

测试用例:

public void testAggregateByProperty() throws Exception {
    MockEndpoint result =  getMockEndpoint("mock:result");

    result.expectedBodiesReceived("A+A+A", "B+B", "A", "Z");

    template.sendBodyAndProperty("direct:A", "A", Exchange.BATCH_SIZE, 3);
    template.sendBodyAndProperty("direct:A", "A", Exchange.BATCH_SIZE, 3);
    template.sendBodyAndProperty("direct:B", "B", Exchange.BATCH_SIZE, 2);
    template.sendBodyAndProperty("direct:A", "A", Exchange.BATCH_SIZE, 3);
    template.sendBodyAndProperty("direct:B", "B", Exchange.BATCH_SIZE, 2);
    template.sendBodyAndProperty("direct:A", "A", Exchange.BATCH_SIZE, 1);
    template.sendBodyAndProperty("direct:Z", "Z", Exchange.BATCH_SIZE, 7);

    assertMockEndpointsSatisfied();
}

结果是:“ A + A”,“ B”,“ A”,“ B”,“ A”,而不是预期的“ A + A + A”,“ B + B”,“ A”,“ Z” 。问题:

  1. 我们对聚合的假设是否错误?
  2. 我们如何实现预期的行为?
  3. 如果我们设置了completionTimeout,则表明第一次交换将发生超时-如果仍然有新的交换,则独立吗?

问题答案:

您几乎可以正常工作了。这是您需要的更改(稍后我会解释)。

from("direct:aggregate").id("aggregate")
    .aggregate(property(AGGREGATION_PROPERTY), new BodyInAggregationStrategy())
    .completionSize(property(Exchange.BATCH_SIZE))
    .to("log:result", "mock:result")

结果将是:

Exchange received, body: A+A+A
Exchange received, body: B+B
Exchange received, body: A

注意:"Z"由于批次大小为,您将不会收到结果7

解释一下-如您所读, Aggregator 是一个通用的骆驼组件,正确定义的关键是:

  • 聚合表达式
  • 完成规则

现在,在你的情况你是聚合上的属性AGGREGATION_PROPERTY,这将是ABZ。另外,您要指定批次大小。

但是,您并未completionSize()在路线中表达。取而代之的是,您使用了completionFromBatchConsumer-做一些不同的事情(代码指出它在寻找Exchange#BATCH_COMPLETE属性),因此结果很奇怪。

无论如何,.completionSize(Exchange.BATCH_SIZE)将使您的测试按需运行。

祝你好运。



 类似资料:
  • 我正试图弄清楚如何使用Spring Batch进行聚合。例如,我有一个带有姓名列表的CSV文件: 我想要文本文件中的姓名计数: 根据我从Spring Batch中学到的,ETL批处理过程(itemReader- Spring Batch是正确的工具吗?还是我应该用Spark?谢谢

  • 我使用的是spring-kafka“2.1.7.Release”,我试图理解max.poll.interval.ms是如何将AckMode作为批处理而将enable.auto.commit作为“false”工作的。这里是我的消费者设置。 这是我的出厂设置 这是我的消费者,我增加了2分钟的延迟 现在,我发布了5条消息,并观察到它处理了所有记录,没有任何问题。但是,如果我将AckMode设置为记录它在

  • 我正在使用spring批处理,我需要实现以下内容 读取包含日期和金额等详细信息的csv文件 将同一日期的所有金额的总和合计 保留一个带有日期和总和的条目 我在过去使用过批处理,我想到了下面的方法。用2个步骤创建批处理。 步骤1: 读取器:使用FlatFileItemReader遍历整个文件 处理器:用键作为日期,值作为金额填充映射。如果存在条目,则获取该值并将其添加到新值 编写器:没有操作编写器,

  • 我试图在运行时使用一个特性聚合路由,到目前为止 显然,上面的代码不起作用,因为项目列表不能传递给。 所以我的问题是,如何将解析为接受,或者如何从的子类动态加载路由?

  • 我想对3条路由使用Apache Camel并行组播,聚合(并等待)其中的2条路由,而让第3条路由自行进行(第3条路由不应阻塞前两条路由)。我还需要在“所有”情况下处理这两个,这意味着如果其中一个失败(例如在处理过程中抛出异常),也应该对其进行聚合。 根据我从Camel文档中了解到的情况,只要不指定StoponException,该行为就应该是“默认的”。但发生的情况是exchange异常永远不会到

  • 我设置了一个HTTPS代理,这样HTTP客户端就可以安全地向代理发送普通HTTP请求。例如,客户机可以向代理发送加密的HTTP GET请求,代理将删除加密并将普通HTTP GET请求发送到终端站点。 我了解到这不是一个常见的设置,只有GoogleChrome具有内置功能来支持这样的场景。(信息在这里-http://wiki.squid-cache.org/Features/HTTPS#Encryp