我目前正致力于提高集成流的性能,试图实现消息处理的并行化。我已经实现了所有使用Java DSL。
当前的集成流使用固定的轮询器从队列通道获取消息,然后依次通过多个处理程序对消息进行串行处理,直到消息到达最终的处理程序,该处理程序将考虑上一个处理程序的每个输出进行一些最终计算。它们都连接在同一个集成流中。基本上,这些处理程序将调用打包到外部系统。这里我需要保留的重要一点是,在完成前一个消息的所有下游流之前,不能从队列中提取消息。我需要并行化的是处理程序。
当前集成流:MessageQueue->轮询器->处理程序1->处理程序2->处理程序X->最终处理程序
我尝试合并并行性,做了以下工作,它工作得很好。
MessageQueue->轮询器->拆分器->执行器->具有子流映射到不同处理程序的路由器->聚合器->最终处理程序
我使用这种方法发现的问题是,在前一条消息通过所有下游流之前,从队列通道获取了一条新消息。很清楚为什么添加拆分器和执行器会改变消息的处理方式,但问题是不同消息的结果之间可能存在依赖关系。
问题是,我如何从队列通道中一次检索一条消息,就像“挂起”轮询器,直到正在处理的消息到达聚合器之后的最后一个endpoint?我不知道如何重新排列组件,或者我还能做什么来实现这一点。
抱歉,我想找答案,但我找不到。这里需要一些指导。非常感谢
@blink这对我来说是有效的,可能需要一些重构,我确信它可以写得更优雅。我不是专家,抱歉。
基本要素是:
>
调用网关方法时消息路由的消息通道
@Bean
public DirectChannel integrationChannel() {
return MessageChannels.direct().get();
}
@MessagingGateway
interface WrappingGateway {
@Gateway(requestChannel = "integrationChannel")
TrackingLog executeIntegration(TrackingLog trackingLog);
}
TrackingLog是一个模型,我使用它来记录下游流的结果。
基本上,我在集成流中调用包装网关,它从消息队列中提取消息。
@Autowired
WrappingGateway integrationGateway;
@Bean
public IntegrationFlow createCatalogueChannelFlow() {
return IntegrationFlows.from(cataloguePriorityChannel())
// Queue Poller
.bridge(s -> s.poller(Pollers.fixedRate(1, TimeUnit.SECONDS).maxMessagesPerPoll(1)).autoStartup(true)
.id("cataloguePriorityChannelBridge"))
// Call to Gateway
.handle(m -> {
this.integrationGateway
.executeIntegration(((TrackingLog) m.getPayload()));
})
.get();
}
@Bean
public IntegrationFlow startCatalogueIntegrationChannelFlow() {
return IntegrationFlows.from(integrationChannel())
// Log
.handle(trackerSupportClient, "logMessagePreExecution")
// Set TrackingLog in message Header
.enrichHeaders(e -> e.headerFunction("TRACKING_LOG", m -> {
return ((TrackingLog) m.getPayload());
}))
....
整个集成稍微复杂一点,它从一个异步HTTP网关、变压器、路由器、mongodb中的存储等开始,这里的重点是正如@Artem Bilan建议我的那样,对网关的调用阻塞了线程,并阻止队列轮询器获得更多消息,直到当前消息被完全处理。
希望这对你有帮助。
那确实是个有趣的任务...我会和你分享我的想法,你会选择最适合你的。
>
我们总是可以将一部分流包装成@messaginggateway
,它应该等待回复。它的子流有多异步已经无关紧要了。因此,您可以并行地执行这些任务,但gateway仍然会在主线程中等待回复,从而阻止队列中的下一轮投票。您应该确保在子流的末尾返回一些内容到replychannel
中,以解除对主线程的阻塞。参见此处的文档:https://docs.spring.io/spring-integration/docs/5.3.0.m4/reference/html/messaging-endpoints.html#网关
我们有一个现成的barriermessagehandler
组件。它的重点确实是用一个消息阻塞当前线程,直到某个触发器到达消息所属的关联。这个组件的问题是,您需要弄清楚如何为第一条消息释放屏障,因为这条消息将作为下一条消息的触发器。虽然我们可以使用一次性路由器来绕过第一条消息的障碍...文档在这里:https://Docs.spring.io/spring-integration/Docs/5.3.0.m4/reference/html/message-routing.html#barrier
我们有一个像MessageSourcePollingTemplate
这样的组件。因此,您可以在需要时调用包装到messageSource
lambda中的queuechannel
。我现在有点想不出如何将它放入一个流中,但这又是一个如何暂停轮询的想法。参见文档:https://docs.spring.io/spring-integration/docs/5.3.0.m4/reference/html/core.html#deferred-acks-message-source
另一种方法是将MethodInterceptor
添加到Poller
配置中,以跳过调用调用。如果某些
。这样,您就会保持一个状态,直到消息处理完毕,并且每个轮询任务都将跳过,直到您重置该状态。文档:https://docs.spring.io/spring-integration/docs/5.3.0.m4/reference/html/messaging-endpoints.html#endpoint-pollingconsumerAtomicBoolean
的状态为True
则proceed()
我正在尝试配置一个轮询器,该轮询器每X秒查询一个bean,以将列表放入一个通道。该通道有一个下游流,该流将列表拆分并输出到发布/子通道(进一步的异步流)。我如何确保在任何给定的时间,只有在流执行过程中,轮询器必须等待/阻止流完成,直到为下一次轮询做好准备(固定速率/延迟)? ...在上进一步异步流以发送出站消息 有没有使用异步切换的阻塞轮询器示例,以及使用barrier向轮询器线程发送完成流信号的
如何使用java dsl Integrationflows从spring集成触发spring批处理作业。 我有下面的代码,它轮询目录中的文件,当新文件添加到目录中时,会生成一条消息,我想在该实例中触发一个Spring批处理作业。请建议。
需求:构建一个基于。NET的应用程序,该应用程序可以定期从IBM Websphere消息队列读取消息,并将这些消息保存到数据库中
如何在下面的JUnit类中运行integrationFlow?目前出现了例外情况 因为整合流没有启动。 JUnit类: }
我正在努力寻找一个成熟的例子,说明如何在Spring Boot框架中使用ApacheCamel进行轮询。 我已经看过了:https://camel.apache.org/manual/latest/polling-consumer.html除此之外:https://camel.apache.org/components/latest/timer-component.html但是代码示例不够广泛,我
我正在开发一个使用Spring Integration 5.0.1和Spring Boot 2.0.0的应用程序。RC1 目前,应用程序响应并运行一些可能需要一段时间才能完成的初始化代码。这不使用任何Spring集成组件。 我还有一些非常基本的集成流,使用JavaDSL编写,并在配置中声明为bean。 有什么方法可以推迟流何时开始消耗消息吗?我希望能够在初始化完成时手动启动它们。 配置似乎是解决方