当前位置: 首页 > 知识库问答 >
问题:

聚合器,它根据相关性释放部分组,但保留其余消息

狄宏大
2023-03-14

我想在聚合器上设置相关策略,以便它使用传入文件(作为消息)名称之外的日期来关联文件,以便所有具有今天日期的文件都属于同一组。现在,由于我可能有多天的数据,我可能已经聚合了2天的文件。我想把发布策略建立在一个已完成的文件(消息)的基础上,该文件包括文件名中的日期,因此基本上每天都会有一堆文件和一个已完成的文件。摄取完成的文件应该从聚合器中释放当天的文件,但仍然保留其他日期的文件,直到摄取当天的完成文件为止。

因此,在这个场景中,关联显然很简单,但我不确定的是如何根据关联键发布组中的所有消息,而只是一些特定消息。文档谈到了messagereaper,但这涉及到messagestore的内容,我想在内存中完成所有这些。

让我用一个例子来说明

我将这些文件放在一个目录中,该目录由文件入站通道适配器进行im轮询

文件-1-2014.04.27。dat公司

文件-2-2014.04.27。dat公司

文件-3-2014.04.27。dat公司

done-2014.04.27.dat

file-1-2014.04.28.dat

file-2-2014.04.28.dat

done-2014.04.28.dat

由于这些文件正在轮询中,我在流中有一个聚合器,所有传入的文件都在其中聚合。为了关联,我想我可以提取日期并将其放在correlation_id头中,以便前3个文件被认为属于一个组,然后第2个文件属于第二组...现在,一旦我当时使用了done-2014.04.27.dat文件,我想释放前3个文件以在流中进一步处理,但请坚持

file-1-2014.04.28.dat

file-2-2014.04.28.dat

直到我收到

完成日期:2014年4月28日。dat公司

然后释放这2个文件。

任何帮助都将不胜感激。谢谢

共有1个答案

卓嘉良
2023-03-14

我不知道你说的“相关性很简单”是什么意思,但接着又说,你只想释放团队的一部分。如果他们有不同的日期,那么他们将在不同的组中,因此不需要释放组中的一部分,只需在午夜后(或第二天的任何时间)运行收割器来释放整个组。你为什么需要一个“完成”的消息,这一点都不清楚。

默认情况下,聚合器使用内存中的消息存储(SimpleMessageStore)。

编辑:

只需将完成文件放在同一个组中,并让您的发布策略检测完成文件的存在。您可以使用表达式,但如果组可以很大,则实现ReleaseStrategy并遍历MessageGroup.getMessages()寻找完成文件会更有效。

下一步取决于聚合器的下游。如果使用拆分器将它们拆分回单独的文件,只需添加一个过滤器即可删除完成的文件。如果直接处理文件集合,请忽略已完成的文件,或添加转换器将其从集合中删除。

关于收割者;假设文件实时到达,我只是建议,如果你每天运行一次收割者(比如01:00),组超时时间为30分钟,那么收割者将释放昨天的文件(不需要完成的文件)。

编辑:

请参阅下面我对您的“答案”的评论-您在filesLogger上有2个订阅者。

 类似资料:
  • 我正在尝试做一个GroupBy基于共享ID的GeoJSON功能列表,以便通过使用拆分/聚合来聚合这些功能的单个字段,如下所示: 除非我取消对这三行的注释,否则聚合器永远不会发布组,数据库也不会收到任何更新。如果我将groupTimeout设置为小于5秒,则会丢失部分结果。 我预计发布策略默认为,我预计在处理完所有(拆分)功能后会自动释放所有组(REST服务消息中总共只有129个功能)。手动将其设置

  • 问题内容: 我在pandas数据框上使用来删除没有特定列的最小值的所有行。像这样: 但是,如果我不止这两列,其他列(例如在我的示例中)将被删除。我可以使用保留这些列,还是必须找到一种不同的方式删除行? 我的数据如下: 并应以如下形式结束: 但是我得到的是: 我一直在浏览文档,找不到任何东西。我试过了: 但是这些都不起作用(我在最后一个中意识到,语法是在创建组后进行聚合的)。 问题答案: 方法1:使

  • 如何在Jolt转换JSON数组中保留其他字段,我正在尝试使用通配符,但在最终输出中没有添加字段? 这是我正在使用的输入示例 我使用了下面的Jolt转换,并尝试了通配符: 下面是我的预期输出,其中发生了移位操作,然后需要保持所有其他字段不变 即将到来的实际输出:

  • 我有一个mongo查询,用于展开四个对象数组,并根据匹配条件过滤数据。如何在Spring data mongodb中执行相同的操作 我使用过单次放卷,但找不到任何具有多次放卷和匹配操作的。

  • 我正在构建一个调用许多不同web服务的系统,我希望生成一个关于调用ws后返回的所有错误的报告。 为此,我使用了<代码> 组在完成时过期=“true”/ 和激活剂: 谢谢

  • 按组合并返回结果 1,比如菜单服务,接口一样,但有多种实现,用group区分,现在消费方需从每种group中调用一次返回结果,合并结果返回,这样就可以实现聚合菜单项。 相关代码可以参考 dubbo 项目中的示例 配置 搜索所有分组 <dubbo:reference interface="com.xxx.MenuService" group="*" merger="true" /> 合并指定分组