当前位置: 首页 > 知识库问答 >
问题:

Camel在动态完成大小上使用聚合和完成

苗承
2023-03-14

嗨,我有一个骆驼路线,它分割一条传入的消息,然后我想聚合这条消息,但我不知道会分割多少条消息。

我使用了以下方法:

.aggregate(new AggregateStrategy()).header("uuid").completionSize(header("CamelSplitSize"))

这不起作用,而且挂起了……不过,如果我将完成大小设置为一个数值,它就起作用了。

有人知道如何动态聚合,并等待完成。顺便说一下,标题是在分割之前设置的。

共有1个答案

乜昆
2023-03-14

您不需要聚合器,也不需要完成大小。您只需要一个具有聚合策略的拆分器EIP。

在链接的示例中,您可以看到这样的拆分器会对之前拆分的内容进行重新聚合。无论分割产生了多少部分。

// a Splitter with an AggregationStrategy
.split([yourSplitCriteria], new MyAggregationStrategy())
    // each splitted message part is sent to this bean 
    .to("bean:PartProcessor")
    // you can do whatever you want in here
    // end the split to re-aggregate the message parts
.end()
// here, after the split-ending you get the re-aggregated messages
 类似资料:
  • 在我的Apache Camel应用程序中,我有一条非常简单的路线: 也就是说,它从AWS SQS获取消息,以100条为一批进行分组,然后通过HTTP发送到某个地方。 与来自SQS的消息的交换在进入聚合阶段时成功完成,此时将它们从队列中删除。 问题是,聚合的交换可能会出现问题(传递时可能会出错),消息将丢失。我真的希望这些原始交换只有在它们所在的聚合交换也成功完成(传递了一批消息)时才能成功完成(从

  • 我需要计算15分钟内A发生的次数和B发生的次数。该流可能是A1,A2,B1,B2,A3,B3,B4,B5,A4,A5,A6,A7,B6 .在我的例子中,事件结果是A2,B1 A3,B3 A7,B6 .我需要接收匹配发生时的实时结果。我有点累了。我认为只有使用flink cep才能做到这一点.但是flink-sql-cep不支持聚合。它只计算发生的事件。在这种情况下,如何用一条SQL完成这项任务。

  • 在我的web应用程序中,我正在使用Play!构建在Akka基础上的用于管理线程的框架。在一个特定的案例中,我组合了许多从外部服务收集数据的CompletionStages,我希望控制并行请求的数量,以便不使那些外部服务负担过重。在不更改整个应用程序的情况下做到这一点的一种方法是控制Akka使用的线程池大小。现在我在akka like中准备了两个线程池,并尝试在这两个池之间进行切换。我正在使用下面这

  • 问题内容: 语境 我们有一个批处理作业,可将本地化的国家名称(即国家名称翻译为不同语言)从外部复制到我们的数据库中。这个想法是在1个块中处理单个国家/地区的所有本地化的国家名称(即第一个块- 安道尔的所有翻译,下一个块- 阿联酋的所有翻译,等等)。我们使用读取外部数据和一些oracle分析功能来提供该国家/地区可用的翻译总数: 问题 因此,按块分割此输入看起来很简单:在读取了其中指定的确切行数后停

  • 我有一个集合xyz,我需要在不使用MongoDB的情况下找到列分支的所有唯一值。distinct方法。我知道我们可以通过group by来完成,这会给我一个对象数组作为结果,但我需要字符串数组作为输出,比如['a','b','c'。我不知道如何以这种方式格式化它,仅在mongo查询中。 数据库。getCollection(“xyz”)。聚合([{$组:{“\u id”:“$分支”},}])

  • 其实针对大多应用场景,DNS 是不会频繁变更的,使用 Nginx 默认的 resolver 配置方式就能解决。 对于部分应用场景,可能需要支持的系统众多:win、centos、ubuntu 等,不同的操作系统获取 DNS 的方法都不太一样。再加上我们使用 Docker,导致我们在容器内部获取 DNS 变得更加难以准确。 如何能够让 Nginx 使用随时可以变化的 DNS 源,成为我们急待解决的问题