当前位置: 首页 > 知识库问答 >
问题:

如何查看Spring集成的集成流中的类型

严子默
2023-03-14

我试图理解在Spring集成中聚合时返回的类型,这相当困难。我正在使用Project Reactor,我的代码片段是:

public FluxAggregatorMessageHandler randomIdsBatchAggregator() {
    FluxAggregatorMessageHandler f = new FluxAggregatorMessageHandler();
    f.setWindowTimespan(Duration.ofSeconds(5));
    f.setCombineFunction(messageFlux -> messageFlux
        .map(Message::getPayload)
        .collectList()
        .map(GenericMessage::new);
    return f;
}

@Bean
public IntegrationFlow dataPipeline() {
   return IntegrationFlows.from(somePublisher)
// ----> The type Message<?> passed? Or Flux<Message<?>>?
      .handle(randomIdsBatchAggregator())
// ----> What type has been returned from the aggregation?
      .handle(bla())
      .get();
}

除了理解示例中传递的类型之外,我还想知道如何才能知道IntegrationFlow中流动的对象及其类型。

共有1个答案

林英锐
2023-03-14
IntegrationFlows.from(somePublisher)

这将在内部创建一个FluxMessageChannel,订阅提供的Publshier。每个事件都会从这个通道发送到它的订阅者——你的聚合器。

FluxAggregatorMessageHandler生成setCombineFunction()JavaDocs中解释的内容:

/**
 * Configure a transformation {@link Function} to apply for a {@link Flux} window to emit.
 * Requires a {@link Mono} result with a {@link Message} as value as a combination result
 * of the incoming {@link Flux} for window.
 * By default a {@link Flux} for window is fully wrapped into a message with headers copied
 * from the first message in window. Such a {@link Flux} in the payload has to be subscribed
 * and consumed downstream.
 * @param combineFunction the {@link Function} to use for result windows transformation.
 */
public void setCombineFunction(Function<Flux<Message<?>>, Mono<Message<?>>> combineFunction) {

所以,这是一个带有消息的单声道,你可以用来实现。collectList()。当框架从FluxAggregatorMessageHandler发出回复消息时,它订阅了Mono。因此你的。句柄(bla())必须包含有效负载列表。这对于聚合器结果来说是很自然的。

在文档中查看更多:https://docs.spring.io/spring-integration/docs/current/reference/html/message-routing.html#flux-aggregator

 类似资料:
  • 如何在下面的JUnit类中运行integrationFlow?目前出现了例外情况 因为整合流没有启动。 JUnit类: }

  • 我已经建立了一个简单的Spring集成流程,该流程由以下步骤组成: 然后定期轮询一个rest api 对有效载荷做一些处理 并将其置于Kafka主题上。 请遵守以下代码: 这非常有效,然而,我正在努力想出一些好的测试。 我应该如何模拟外部RESTAPI

  • null 如何在transform()步骤中添加Jaxb2Marshaller?

  • 我尝试在Spring集成中访问flux对象,而不将流声明拆分为两个函数。我想知道如何执行以下操作: 我不介意将我在评论中提到的通量操作转移到另一个类(可能是为了作为某种网关),但对我来说,从同一个函数启动和流显然非常重要,因此它将非常清晰易读,能够理解我在应用程序中所做的事情。我看到了Monos网关的文档,但示例代码甚至不可能(它们讨论的是函数中没有的通量,作为初学者,我很难理解那里发生了什么)。

  • 如何使用java dsl Integrationflows从spring集成触发spring批处理作业。 我有下面的代码,它轮询目录中的文件,当新文件添加到目录中时,会生成一条消息,我想在该实例中触发一个Spring批处理作业。请建议。

  • 在spring integration (Java DSL)中,如何定义一个完整流程的事务? 通过Spring集成,我们可以定义一个示例流程: 我需要一个跨度整个流程的交易。目前,当我使用“aMessage转换器”访问数据库时,事务将在处理完此消息转换器后关闭。但是我需要一个在处理“另一个消息转换器”时仍未提交的事务? 我希望只需添加一个“@Transactional”(或@Transaction