当前位置: 首页 > 知识库问答 >
问题:

在Spring集成中分组和忽略消息

归鹤龄
2023-03-14

我有一个spring集成应用程序,它处理数据库中的不同交易类型,我将其转换、过滤并路由到相应的tradeEventChannel

新建行ID-

对于一种特定的交易事件类型(repoTradeChannel),有两种可能的情况:

  1. 用户交易开放式回购交易,这转化为一个回购(开放消息)TradeEvent,应该路由到repotradeconnel进行处理。当交易被称为(小时、天或周后)时,会收到第二条消息(术语消息),也应进行处理

在上述两种情况下,TradeEvent/消息可以按idTradeEvent类上的其他属性进行分组。

我的问题是,我是否可以使用聚合器对第二种情况下的消息进行分组,而忽略第二种情况?例如,我可以查看id或action字段来确定哪些是不可接受的。在第二种情况下,消息/事件在几分钟内收到,而在第一种情况下,第二条消息(术语消息)可以延迟数小时、数天或数周。

由于我无法阻止传入消息并进行筛选,是否有策略可用于在聚合时处理这些情况。我需要依赖分组,然后确定第二条消息在第二种情况下没有用处。

实现是什么样子的?我应该使用不同的策略,比如缓存吗?

总之,问题实际上是如何区分在案例1和案例2中接收到的术语消息,其中案例2应该被抑制。


共有1个答案

金毅
2023-03-14

您的问题过于具体,如果没有任何简单的常见解释或代码,很难直接回答。

无论如何,您确实可以基于id或action字段,将聚合器与CorrelationsStrategy一起使用。发布策略应该像MessageCountReleaseStrategy一样简单,阈值为2。

从聚合器输出什么,您可以在消息组处理器实现中决定:您可以将两条消息合并为一条。您仍然可以生成它们的列表,或者只是忽略其中一条消息并只生成另一条。

 类似资料:
  • 使用Java 8 streams,我可以通过分类器对消息进行分组: 我想写一个聚合器,将消息进行相应的分组。在如上所示的五条有效载荷消息中,我想生成三条消息:第一条消息应将“a”作为有效载荷,第二条消息应将三条“b”作为有效载荷,第三条消息应将“c”作为有效载荷。 当达到序列大小时,应释放所有消息组。基于有效负载的分组工作正常,但消息组永远不会被释放。 在发布策略中,我可以访问序列大小,但我无法找

  • 我想既然查询的语法是: 查询的语法是 忽略案例集合查询的语法为: 但这似乎不起作用(它仍然区分大小写)。 想法? 编辑: 它也不是。

  • 使用Spring Integr中的拆分器,我拆分了从数据库中的表中选择的数据行。每条消息完成处理后,我想像旧消息一样将每条消息聚合到一条消息中。我该怎么办?我不知道拆分器拆分了多少条消息。我只知道拆分消息头中的相关ID。即使我聚合消息,我也无法制定发布策略。 我如何解决这个问题? 以及是否有任何方法可以使用jdbc-out站网关或jdbc-out站通道适配器一次插入多行数据,而无需使用拆分器插入每

  • 嗨! 我需要扫描mysql(5.1)上的一个非常大的表, 该表大致如下: 我需要将main_id+键的所有唯一值获取到一个新表中。 使用以下查询需要花费大量时间(在非常快的服务器上3天后仍在运行): 所以我的问题是- 这会更快吗?

  • 我必须为kafka流消费者设置一个组id,它符合严格的命名约定。 在深入跟踪留档后,我找不到有效的方法。因为我仍然相信我可能有误解,我更喜欢在这里打开一个问题以供同行审查,然后再打开一个bug问题。 一年前就已经问过一个类似的问题,但这个问题不是很夸张,还没有回答,我希望我能在这里对这个问题有更多的见解。 从官方文档的几个来源来看,我看到在我的应用程序的 中配置这应该很容易。 文件中指出,我可以:

  • 我正在开发一个使用Spring Integration 5.0.1和Spring Boot 2.0.0的应用程序。RC1 目前,应用程序响应并运行一些可能需要一段时间才能完成的初始化代码。这不使用任何Spring集成组件。 我还有一些非常基本的集成流,使用JavaDSL编写,并在配置中声明为bean。 有什么方法可以推迟流何时开始消耗消息吗?我希望能够在初始化完成时手动启动它们。 配置似乎是解决方