当前位置: 首页 > 知识库问答 >
问题:

Apache Camel:聚合pollEnrich结果而不是from以及如何保留头

蓬意致
2023-03-14

在我的骆驼路线中,我使用来自队列的消息;每条消息都包含标题“pad”(路径)和文件前缀。例如:

消息1: pad="/some/dir ",file="AAA "消息2: pad="/another/dir ",file="BRD "

每条消息,我想要创建一个文件: 消息1: /一些/目录/AAA.tar (包含所有文件 /一些/目录/AAA*) 消息2: /另一个/目录/BRD.tar (包含 /另一个/目录/BRD 中的所有文件.tar)

目录和文件名在另一个路径中收集。

到目前为止,我有这条骆驼路线:

from("broker1:files.queue")
.log("starting with message ${header.file}")
.pollEnrich()
    .simple("file:${header.pad}?antInclude=${header.file}.*")
.aggregate(new TarAggregationStrategy(false,true))
     .constant(true)
     .completionFromBatchConsumer()
     .eagerCheckCompletion()
     .parallelProcessing(false)
     .setHeader("file", simple("${header.file}"))
     .setHeader("pad", simple("${header.pad}"))
.log("tarring to: ${header.pad}${header.file}.tar")
.setHeader(Exchange.FILE_NAME, simple("${header.file}.tar"))
.setHeader(Exchange.FILE_PATH, simple("${header.pad}"))
.to("file://ignored")
.log("Going to do other stuff here on ${header.file}");

我在这里有几个问题:-当运行这条路线时,我看到多个“以消息开始”行,然后我看到一个日志行“标上了”-日志行“标上了”实际上是说。tar”,头是空的...——”了。tar“创建的文件存储在”。/ignored”并包含每个jms消息文件头中的一个文件。

这使我相信,聚合发生在我不期望的水平上;我希望聚合pollEnrich的结果,而不是队列上的其他消息。为什么,我怎样才能让它按我想要的方式运行?

另一个是丢失的标头;这可能是由于聚合错误的项目…无论如何,我认为聚合中的setHeader()应该设置它们,但它们无论如何都丢失了;我如何保存它们?

我对骆驼编程比较陌生;所以请原谅我的缺点;代码中的缩进是我认为范围应该是的样子;这可能完全关闭了。我使用的是camel-2.20.1,但可以切换到任何其他版本。

编辑阅读让我稍微改变了路线;如评论中所写;它现在看起来像这样:(TarAggregationStrategy()是在我的CamelContext中创建的,并在那里添加到注册表中)

from("broker1:files.queue")
.log("starting with message ${header.file}")
.pollEnrich()
    .simple("file:${header.pad}?antInclude=${header.file}.*")
    .aggregationStrategyRef("tarAggregationStrategy")
.log("tarring to: ${header.pad}${header.file}.tar")
.setHeader(Exchange.FILE_NAME, simple("${header.file}.tar"))
.setHeader(Exchange.FILE_PATH, simple("${header.pad}"))
.to("file://ignored")
.log("Going to do other stuff here on ${header.file}");

现在看起来确实好多了;除了由于无法根据堆栈跟踪创建临时文件而没有发生实际tar:

org.apache.camel.component.file.GenericFileOperationFailedException: Could not make temp file (c9db039a-1585-4e63-85dc-e21ca268b290)
        at org.apache.camel.processor.aggregate.tarfile.TarAggregationStrategy.aggregate(TarAggregationStrategy.java:174)
        at org.apache.camel.processor.PollEnricher.process(PollEnricher.java:280)
        at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:548)
        at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201)
        at org.apache.camel.processor.Pipeline.process(Pipeline.java:138)
        at org.apache.camel.processor.Pipeline.process(Pipeline.java:101)
        at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201)
        at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:97)
        at org.apache.camel.component.jms.EndpointMessageListener.onMessage(EndpointMessageListener.java:112)
        at org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:719)
        at org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:679)
        at org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:649)
        at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:317)
        at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:255)
        at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1166)
        at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1158)
        at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1055)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Could not make temp file (c9db039a-1585-4e63-85dc-e21ca268b290)
        at org.apache.camel.processor.aggregate.tarfile.TarAggregationStrategy.addFileToTar(TarAggregationStrategy.java:199)
        at org.apache.camel.processor.aggregate.tarfile.TarAggregationStrategy.aggregate(TarAggregationStrategy.java:167)
        ... 19 more

我注意到的是,不能创建临时文件之后(和)之间的部分实际上是主体的内容(我可以将其保留为空,但没有明显的原因,我填充了文件id)

共有1个答案

澹台俊达
2023-03-14

如果您希望保留消息的头,以便它们在聚合后仍然存在,那么您的聚合策略必须做到这一点。我不认为< code > TarAggregationStrategy 会这样做。

将聚合器视为边界。它收集骆驼交换(骆驼包装的消息),并根据聚合策略创建新的交换。我猜大多数开箱即用的聚合器都专注于合并或附加消息正文,而不是标头。

因此,如果您想要标题<code>标题。文件和头。pad为了在聚合中生存,您必须在自己的聚合策略中实现这一点。

由于您使用TarA聚合策略,因此您可以扩展或修饰此策略,只需实现标头内容并将主体内容委托给TarA聚合策略即可。

 类似资料:
  • 我一直在尝试在elasticsearch术语聚合中添加分页。在查询中,我们可以添加分页,如, 这很清楚,但当我想向聚合添加分页时,我读了很多关于它的内容,但找不到任何内容,我的代码如下所示, 是否有任何方法可以使用函数或任何其他建议创建分页?

  • 我试图在聚合器完成后获得一个回复,但是我得到一个异常,我没有指定任何聚合器子项,但是当我指定一个。to()endpoint我没有收到聚合结果。。。这可能吗? 控制器: 聚合器:

  • 问题内容: 我使用SUM聚合计算服务过程的持续时间。执行过程的每一步都将保存在Elasticsearch中的调用ID下。 这是我监视的内容: 过滤: 这将返回该过程的完整持续时间,并且还告诉我该过程的哪一部分是最快的,而哪一部分是最慢的。 接下来,我要通过serviceId 计算 所有已完成过程 的平均 持续时间 。在这种情况下,我只关心每项服务的总时长,因此我可以提供帮助。 如何从total_d

  • 问题内容: 我正在尝试列出聚合中的所有存储桶,但似乎只显示了前10个。 我的搜索: 返回值: 对于此聚合,我有10个以上的键。在此示例中,我将有145个键,并且我希望每个键的计数。桶上有分页吗?我可以全部拿走吗? 我正在使用Elasticsearch 1.1.0 问题答案: size参数应该是术语查询示例的参数: 如文档中所述,仅适用于1.1.0版及更高版本 编辑 根据@PhaedrusTheGr

  • 我不熟悉Mongo中的聚合查询,并且一直在努力产生我想要的输出。我有以下聚合查询: 返回以下结果: 如何修改聚合查询,以便只返回2个文档而不是3个文档?将两个“ABC-123”结果合并为一个结果,并使用带有“bu”和“count”字段的新计数数组,即。 非常感谢

  • 问题内容: 想象一下,我有两种记录:一个存储桶和一个项目,其中存储在存储桶中的项目,而存储桶中的项目可能相对较少(通常不超过4个,从不超过10个)。这些记录被压缩为一个(具有更多存储桶信息的项目),并放置在Elasticsearch中。我要解决的任务是通过依赖项属性的过滤查询一次找到500个存储桶(最大),其中包含所有相关项,而我受困于限制/抵消聚合。我该如何执行此类任务?我看到聚合使我可以控制相