当前位置: 首页 > 知识库问答 >
问题:

分类器的Spring集成组消息

衡翰藻
2023-03-14

使用Java 8 streams,我可以通过分类器对消息进行分组:

Map<String, List<String>> grouped = Arrays.asList("a", "b", "b", "b", "c")
        .stream()
        .collect(Collectors.groupingBy(Function.identity()));

我想写一个聚合器,将消息进行相应的分组。在如上所示的五条有效载荷消息中,我想生成三条消息:第一条消息应将“a”作为有效载荷,第二条消息应将三条“b”作为有效载荷,第三条消息应将“c”作为有效载荷。

当达到序列大小时,应释放所有消息组。基于有效负载的分组工作正常,但消息组永远不会被释放。

在发布策略中,我可以访问序列大小,但我无法找到已处理的项目总数。如何发布分组邮件?

public interface StringGrouper {
    List<Message<?> groupSame(List<String> toGroup);
}   

@Bean
public IntegrationFlow groupStringsFlow() {
    return IntegrationFlows.from(StringGrouper.class)
        .split()
        .aggregate(agg -> agg
            .correlationStrategy(message -> message.getPayload())
            .releaseStrategy(group -> group.getSequenceSize() == /* what? */)) 
        .logAndReply();
}

@Test
public void shouldGroupMessages {
    List<Message<?> grouper
        .groupSame(Arrays.asList("a", "b", "b", "b", "c"));
}

解决方法是根本不使用聚合器,而是将传入列表分组到转换器中。但我希望我可以使用聚合器来实现这一点。

@Bean
public IntegrationFlow groupStringsFlow() {
    return IntegrationFlows.from(StringGrouper.class)
        .<List<String>, Collection<List<String>>>transform(source -> source.stream()
            .collect(Collectors.collectingAndThen(
                Collectors.groupingBy(Function.identity()), 
                grouped -> grouped.values())))
        .split()
        .log() // work with messages
        .aggregate()
        .get();
}

共有1个答案

姬熙云
2023-03-14

使用默认的序列大小释放策略将它们聚合为单个组,并使用自定义输出处理器(MessageGroupProcessor)在有效负载上重新分组,返回集合

 类似资料:
  • 我有一个spring集成应用程序,它处理数据库中的不同交易类型,我将其转换、过滤并路由到相应的tradeEventChannel 新建行ID- 对于一种特定的交易事件类型(repoTradeChannel),有两种可能的情况: 用户交易开放式回购交易,这转化为一个回购(开放消息)

  • 使用Spring Integr中的拆分器,我拆分了从数据库中的表中选择的数据行。每条消息完成处理后,我想像旧消息一样将每条消息聚合到一条消息中。我该怎么办?我不知道拆分器拆分了多少条消息。我只知道拆分消息头中的相关ID。即使我聚合消息,我也无法制定发布策略。 我如何解决这个问题? 以及是否有任何方法可以使用jdbc-out站网关或jdbc-out站通道适配器一次插入多行数据,而无需使用拆分器插入每

  • 我有一个用例,我的消息被拆分两次,我想聚合所有这些消息。如何才能最好地实现这一点,我应该通过引入不同的序列头来聚合消息两次,还是有办法通过重写消息分组的方法在单个聚合步骤中聚合消息?

  • 我有一个集成应用程序,大部分工作,但注意到昨天一个消息丢失了。当时,service-activatorendpoint正忙于处理先前的消息。 以下是适用于该问题的配置。

  • 目前,我正在与spring integration合作开发新的应用程序,并启动了poc,以了解如何处理故障案例。在我的应用程序中,spring integration将接收来自IBM mq的消息,并根据消息类型验证头信息和到不同队列的路由。传入的消息可能是批量消息,所以我使用了spring integration的拆分器和聚合器,并且对技术工作流程有很好的进展和控制。目前我面临的问题很少,我们有I