在我的骆驼路线中,我使用来自队列的消息;每条消息都包含标题“pad”(路径)和文件前缀。例如:
消息1: pad="/some/dir ",file="AAA "消息2: pad="/another/dir ",file="BRD "
每条消息,我想要创建一个文件: 消息1: /一些/目录/AAA.tar (包含所有文件 /一些/目录/AAA*) 消息2: /另一个/目录/BRD.tar (包含 /另一个/目录/BRD 中的所有文件.tar)
目录和文件名在另一个路径中收集。
到目前为止,我有这条骆驼路线:
from("broker1:files.queue")
.log("starting with message ${header.file}")
.pollEnrich()
.simple("file:${header.pad}?antInclude=${header.file}.*")
.aggregate(new TarAggregationStrategy(false,true))
.constant(true)
.completionFromBatchConsumer()
.eagerCheckCompletion()
.parallelProcessing(false)
.setHeader("file", simple("${header.file}"))
.setHeader("pad", simple("${header.pad}"))
.log("tarring to: ${header.pad}${header.file}.tar")
.setHeader(Exchange.FILE_NAME, simple("${header.file}.tar"))
.setHeader(Exchange.FILE_PATH, simple("${header.pad}"))
.to("file://ignored")
.log("Going to do other stuff here on ${header.file}");
我在这里有几个问题:-当运行这条路线时,我看到多个“以消息开始”行,然后我看到一个日志行“标上了”-日志行“标上了”实际上是说。tar”,头是空的...——”了。tar“创建的文件存储在”。/ignored”并包含每个jms消息文件头中的一个文件。
这使我相信,聚合发生在我不期望的水平上;我希望聚合pollEnrich的结果,而不是队列上的其他消息。为什么,我怎样才能让它按我想要的方式运行?
另一个是丢失的标头;这可能是由于聚合错误的项目…无论如何,我认为聚合中的setHeader()应该设置它们,但它们无论如何都丢失了;我如何保存它们?
我对骆驼编程比较陌生;所以请原谅我的缺点;代码中的缩进是我认为范围应该是的样子;这可能完全关闭了。我使用的是camel-2.20.1,但可以切换到任何其他版本。
编辑阅读让我稍微改变了路线;如评论中所写;它现在看起来像这样:(TarAggregationStrategy()是在我的CamelContext中创建的,并在那里添加到注册表中)
from("broker1:files.queue")
.log("starting with message ${header.file}")
.pollEnrich()
.simple("file:${header.pad}?antInclude=${header.file}.*")
.aggregationStrategyRef("tarAggregationStrategy")
.log("tarring to: ${header.pad}${header.file}.tar")
.setHeader(Exchange.FILE_NAME, simple("${header.file}.tar"))
.setHeader(Exchange.FILE_PATH, simple("${header.pad}"))
.to("file://ignored")
.log("Going to do other stuff here on ${header.file}");
现在看起来确实好多了;除了由于无法根据堆栈跟踪创建临时文件而没有发生实际tar:
org.apache.camel.component.file.GenericFileOperationFailedException: Could not make temp file (c9db039a-1585-4e63-85dc-e21ca268b290)
at org.apache.camel.processor.aggregate.tarfile.TarAggregationStrategy.aggregate(TarAggregationStrategy.java:174)
at org.apache.camel.processor.PollEnricher.process(PollEnricher.java:280)
at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:548)
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201)
at org.apache.camel.processor.Pipeline.process(Pipeline.java:138)
at org.apache.camel.processor.Pipeline.process(Pipeline.java:101)
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201)
at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:97)
at org.apache.camel.component.jms.EndpointMessageListener.onMessage(EndpointMessageListener.java:112)
at org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:719)
at org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:679)
at org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:649)
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:317)
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:255)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1166)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1158)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1055)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Could not make temp file (c9db039a-1585-4e63-85dc-e21ca268b290)
at org.apache.camel.processor.aggregate.tarfile.TarAggregationStrategy.addFileToTar(TarAggregationStrategy.java:199)
at org.apache.camel.processor.aggregate.tarfile.TarAggregationStrategy.aggregate(TarAggregationStrategy.java:167)
... 19 more
我注意到的是,不能创建临时文件之后(和)之间的部分实际上是主体的内容(我可以将其保留为空,但没有明显的原因,我填充了文件id)
如果您希望保留消息的头,以便它们在聚合后仍然存在,那么您的聚合策略必须做到这一点。我不认为< code > TarAggregationStrategy 会这样做。
将聚合器视为边界。它收集骆驼交换(骆驼包装的消息),并根据聚合策略
创建新的交换。我猜大多数开箱即用的聚合器都专注于合并或附加消息正文,而不是标头。
因此,如果您想要标题<code>标题。文件和头。pad
为了在聚合中生存,您必须在自己的聚合策略中实现这一点。
由于您使用TarA聚合策略,
因此您可以扩展或修饰此策略,只需实现标头内容并将主体内容委托给TarA聚合策略
即可。
我一直在尝试在elasticsearch术语聚合中添加分页。在查询中,我们可以添加分页,如, 这很清楚,但当我想向聚合添加分页时,我读了很多关于它的内容,但找不到任何内容,我的代码如下所示, 是否有任何方法可以使用函数或任何其他建议创建分页?
我试图在聚合器完成后获得一个回复,但是我得到一个异常,我没有指定任何聚合器子项,但是当我指定一个。to()endpoint我没有收到聚合结果。。。这可能吗? 控制器: 聚合器:
问题内容: 我使用SUM聚合计算服务过程的持续时间。执行过程的每一步都将保存在Elasticsearch中的调用ID下。 这是我监视的内容: 过滤: 这将返回该过程的完整持续时间,并且还告诉我该过程的哪一部分是最快的,而哪一部分是最慢的。 接下来,我要通过serviceId 计算 所有已完成过程 的平均 持续时间 。在这种情况下,我只关心每项服务的总时长,因此我可以提供帮助。 如何从total_d
问题内容: 我正在尝试列出聚合中的所有存储桶,但似乎只显示了前10个。 我的搜索: 返回值: 对于此聚合,我有10个以上的键。在此示例中,我将有145个键,并且我希望每个键的计数。桶上有分页吗?我可以全部拿走吗? 我正在使用Elasticsearch 1.1.0 问题答案: size参数应该是术语查询示例的参数: 如文档中所述,仅适用于1.1.0版及更高版本 编辑 根据@PhaedrusTheGr
我不熟悉Mongo中的聚合查询,并且一直在努力产生我想要的输出。我有以下聚合查询: 返回以下结果: 如何修改聚合查询,以便只返回2个文档而不是3个文档?将两个“ABC-123”结果合并为一个结果,并使用带有“bu”和“count”字段的新计数数组,即。 非常感谢
问题内容: 想象一下,我有两种记录:一个存储桶和一个项目,其中存储在存储桶中的项目,而存储桶中的项目可能相对较少(通常不超过4个,从不超过10个)。这些记录被压缩为一个(具有更多存储桶信息的项目),并放置在Elasticsearch中。我要解决的任务是通过依赖项属性的过滤查询一次找到500个存储桶(最大),其中包含所有相关项,而我受困于限制/抵消聚合。我该如何执行此类任务?我看到聚合使我可以控制相