当前位置: 首页 > 知识库问答 >
问题:

Spring集成-拆分后发布订阅通道聚合

蓬祺
2023-03-14

我有一个基于DSL的流,它使用拆分迭代对象列表并发送Kafka消息:

.transform(...)
.split()
.channel(KAFKA_OUT_CHANNEL)

在所有消息发出后,我需要调用服务,还需要记录处理了多少消息。我知道一种方法是使用publishSubscribeChannel,其中第一个subscribe执行实际的Kafka发送,然后聚合执行服务调用:

.transform(...)
.split().
.publishSubscribeChannel(pubSub -> pubSub
        .subscribe(f -> f.channel(KAFKA_OUT_CHANNEL)))

我在弄清楚如何使用DSL在pubSubChannel中实际执行.聚合部分时遇到了问题。到目前为止,我已经尝试过:

.subscribe(f ->  f.channel(KAFKA_OUT_CHANNEL)
.subscribe(f -> f.aggregate(c -> c.processor( ?? ))))

有什么指示吗?

共有2个答案

璩浩广
2023-03-14

这取决于聚合后需要什么-如果您只需要一个有效负载列表,只需使用aggregate()。。。

@SpringBootApplication
public class So51059703Application {

    public static void main(String[] args) {
        SpringApplication.run(So51059703Application.class, args);
    }

    @Bean
    public ApplicationRunner runner(ApplicationContext context) {
        return args -> {
            context.getBean("flow.input", MessageChannel.class).send(new GenericMessage<>(
                    Arrays.asList("a", "b", "c")));
        };
    }

    @Bean
    public IntegrationFlow flow() {
        return f -> f
                .split()
                .publishSubscribeChannel(p -> p
                        .subscribe(f1 -> f1.handle(System.out::println))
                        .subscribe(f2 -> f2
                                .aggregate()
                                .handle(System.out::println)));
    }

}

如果您只需要计数:

@SpringBootApplication
public class So51059703Application {

    public static void main(String[] args) {
        SpringApplication.run(So51059703Application.class, args);
    }

    @Bean
    public ApplicationRunner runner(ApplicationContext context) {
        return args -> {
            context.getBean("flow.input", MessageChannel.class).send(new GenericMessage<>(
                    Arrays.asList("a", "b", "c")));
        };
    }

    @Bean
    public IntegrationFlow flow() {
        return f -> f
                .split()
                .publishSubscribeChannel(p -> p
                        .subscribe(f1 -> f1.handle(System.out::println))
                        .subscribe(f2 -> f2
                                .aggregate(c -> c
                                        .processor(processor(), "reduce"))
                                .handle(System.out::println)));
    }

    @Bean
    public Object processor() {
        return new Object() {

            public int reduce(List<Message<?>> messages) {
                return messages.size();
            }

        };
    }

}
缪兴腾
2023-03-14

默认情况下,AbstractMessageSplitter有一个applySequence=true:

/**
 * Set the applySequence flag to the specified value. Defaults to true.
 * @param applySequence true to apply sequence information.
 */
public void setApplySequence(boolean applySequence) {

因此,我们在消息中包含以下标题:

if (this.applySequence) {
    builder.pushSequenceDetails(correlationId, sequenceNumber, sequenceSize);
}

聚合器的默认关联策略实际上基于IntegrationMessageHeaderAccessor。CORRELATION\u ID标题。这样,它将具有相同相关性键的消息收集到相同的消息组中。默认的发布策略基于消息组和sequenceSize头的比较。最后,默认的消息组处理器(MessageGroupProcessor)只需将组中的所有消息收集到一条消息中,并将收集作为有效负载。换句话说,聚合器的默认行为与拆分器完全相反。

我不知道您将从聚合器执行什么输出,但您不需要任何其他逻辑来配置聚合器-关联和释放逻辑应该基于默认状态。

您可以在参考手册中找到足够的信息。

 类似资料:
  • 谢谢你提前阅读。在我的主要方法中,我有一个出版频道 在执行长时间运行流程的服务中,它创建了一个收费计划,我将频道注入其中 这就是我正在努力解决的问题。我想使用CompletableFuture并在另一个Springbean中获得未来事件的有效负载。我需要一个未来来从消息返回有效负载。我想我想创建一个ServiceActivator作为消息的endpoint,但正如我所说的,我需要它来返回未来a的有

  • 目前,我正在与spring integration合作开发新的应用程序,并启动了poc,以了解如何处理故障案例。在我的应用程序中,spring integration将接收来自IBM mq的消息,并根据消息类型验证头信息和到不同队列的路由。传入的消息可能是批量消息,所以我使用了spring integration的拆分器和聚合器,并且对技术工作流程有很好的进展和控制。目前我面临的问题很少,我们有I

  • 简介 Redis 的列表类型键可以用来实现队列,并且支持阻塞式读取,所以 Redis 能够非常容易的实现一个高性能的优先队列。同时在更高层面上,Redis 还支持“发布/订阅”的消息模式,可以基于此构建一个聊天系统。 发布示例 发布(Publish)即将消息发布到频道中。示例代码: // 发送消息 Redis::publish('chan-1', 'Hello, World!'); // 发送消息

  • 问题内容: 跟随Redis Pub / Sub 这工作正常,我可以使用以下任何语言发布消息 使用,我可以验证此请求是否已正确发布 当我将订阅者 块 添加到 其他类(侦听器类)中的 该频道时,问题就开始了,如下所示 中的,还表明侦听器已正确订阅 问题是,当我将订户侦听器类添加到相同的Rails应用程序时…它停止工作,导致侦听Redis服务器并停止执行任何其他代码…它只是坐在那里侦听。 因此,有一种方

  • 我需要实现一个由多个步骤组成的集成流程,每个步骤都可以由不同数量的处理器(插件)执行。 到目前为止我所拥有的: 预期的行为如下: 通过网关发送第一个请求 一切正常,但结果不是预期的,我只收到2个(随机)项目,而不是4个。 我认为问题在于聚合器仅在两个项目之后触发发布,因为“step/2”通道中的“apply sequence”覆盖了“step/1”中的“apply sequence”。所以问题是:

  • 使用Spring Integr中的拆分器,我拆分了从数据库中的表中选择的数据行。每条消息完成处理后,我想像旧消息一样将每条消息聚合到一条消息中。我该怎么办?我不知道拆分器拆分了多少条消息。我只知道拆分消息头中的相关ID。即使我聚合消息,我也无法制定发布策略。 我如何解决这个问题? 以及是否有任何方法可以使用jdbc-out站网关或jdbc-out站通道适配器一次插入多行数据,而无需使用拆分器插入每