我试图理解在Spring集成中聚合时返回的类型,这相当困难。我正在使用Project Reactor,我的代码片段是:
public FluxAggregatorMessageHandler randomIdsBatchAggregator() {
FluxAggregatorMessageHandler f = new FluxAggregatorMessageHandler();
f.setWindowTimespan(Duration.ofSeconds(5));
f.setCombineFunction(messageFlux -> messageFlux
.map(Message::getPayload)
.collectList()
.map(GenericMessage::new);
return f;
}
@Bean
public IntegrationFlow dataPipeline() {
return IntegrationFlows.from(somePublisher)
// ----> The type Message<?> passed? Or Flux<Message<?>>?
.handle(randomIdsBatchAggregator())
// ----> What type has been returned from the aggregation?
.handle(bla())
.get();
}
除了理解示例中传递的类型之外,我还想知道如何才能知道IntegrationFlow
中流动的对象及其类型。
IntegrationFlows.from(somePublisher)
这将在内部创建一个FluxMessageChannel
,订阅提供的Publshier
。每个事件都会从这个通道发送到它的订阅者——你的聚合器。
FluxAggregatorMessageHandler
生成setCombineFunction()
JavaDocs中解释的内容:
/**
* Configure a transformation {@link Function} to apply for a {@link Flux} window to emit.
* Requires a {@link Mono} result with a {@link Message} as value as a combination result
* of the incoming {@link Flux} for window.
* By default a {@link Flux} for window is fully wrapped into a message with headers copied
* from the first message in window. Such a {@link Flux} in the payload has to be subscribed
* and consumed downstream.
* @param combineFunction the {@link Function} to use for result windows transformation.
*/
public void setCombineFunction(Function<Flux<Message<?>>, Mono<Message<?>>> combineFunction) {
所以,这是一个带有消息的单声道
,你可以用来实现。collectList()
。当框架从FluxAggregatorMessageHandler
发出回复消息时,它订阅了Mono
。因此你的。句柄(bla())
必须包含有效负载列表。这对于聚合器结果来说是很自然的。
在文档中查看更多:https://docs.spring.io/spring-integration/docs/current/reference/html/message-routing.html#flux-aggregator
如何在下面的JUnit类中运行integrationFlow?目前出现了例外情况 因为整合流没有启动。 JUnit类: }
我已经建立了一个简单的Spring集成流程,该流程由以下步骤组成: 然后定期轮询一个rest api 对有效载荷做一些处理 并将其置于Kafka主题上。 请遵守以下代码: 这非常有效,然而,我正在努力想出一些好的测试。 我应该如何模拟外部RESTAPI
null 如何在transform()步骤中添加Jaxb2Marshaller?
我尝试在Spring集成中访问flux对象,而不将流声明拆分为两个函数。我想知道如何执行以下操作: 我不介意将我在评论中提到的通量操作转移到另一个类(可能是为了作为某种网关),但对我来说,从同一个函数启动和流显然非常重要,因此它将非常清晰易读,能够理解我在应用程序中所做的事情。我看到了Monos网关的文档,但示例代码甚至不可能(它们讨论的是函数中没有的通量,作为初学者,我很难理解那里发生了什么)。
如何使用java dsl Integrationflows从spring集成触发spring批处理作业。 我有下面的代码,它轮询目录中的文件,当新文件添加到目录中时,会生成一条消息,我想在该实例中触发一个Spring批处理作业。请建议。
在spring integration (Java DSL)中,如何定义一个完整流程的事务? 通过Spring集成,我们可以定义一个示例流程: 我需要一个跨度整个流程的交易。目前,当我使用“aMessage转换器”访问数据库时,事务将在处理完此消息转换器后关闭。但是我需要一个在处理“另一个消息转换器”时仍未提交的事务? 我希望只需添加一个“@Transactional”(或@Transaction