当前位置: 首页 > 知识库问答 >
问题:

在spring集成中,下游流完成后,如何一次轮询队列1消息

井斌斌
2023-03-14

我目前正致力于提高集成流的性能,试图实现消息处理的并行化。我已经实现了所有使用Java DSL。

当前的集成流使用固定的轮询器从队列通道获取消息,然后依次通过多个处理程序对消息进行串行处理,直到消息到达最终的处理程序,该处理程序将考虑上一个处理程序的每个输出进行一些最终计算。它们都连接在同一个集成流中。基本上,这些处理程序将调用打包到外部系统。这里我需要保留的重要一点是,在完成前一个消息的所有下游流之前,不能从队列中提取消息。我需要并行化的是处理程序。

当前集成流:MessageQueue->轮询器->处理程序1->处理程序2->处理程序X->最终处理程序

我尝试合并并行性,做了以下工作,它工作得很好。

MessageQueue->轮询器->拆分器->执行器->具有子流映射到不同处理程序的路由器->聚合器->最终处理程序

我使用这种方法发现的问题是,在前一条消息通过所有下游流之前,从队列通道获取了一条新消息。很清楚为什么添加拆分器和执行器会改变消息的处理方式,但问题是不同消息的结果之间可能存在依赖关系。

问题是,我如何从队列通道中一次检索一条消息,就像“挂起”轮询器,直到正在处理的消息到达聚合器之后的最后一个endpoint?我不知道如何重新排列组件,或者我还能做什么来实现这一点。

抱歉,我想找答案,但我找不到。这里需要一些指导。非常感谢

@blink这对我来说是有效的,可能需要一些重构,我确信它可以写得更优雅。我不是专家,抱歉。

基本要素是:

>

  • 封装消息系统的接口
  • 调用网关方法时消息路由的消息通道

    @Bean
    public DirectChannel integrationChannel() {
        return MessageChannels.direct().get();
    }
    
    @MessagingGateway
    interface WrappingGateway {
    
        @Gateway(requestChannel = "integrationChannel")
        TrackingLog executeIntegration(TrackingLog trackingLog);
    
    }
    

    TrackingLog是一个模型,我使用它来记录下游流的结果。

    基本上,我在集成流中调用包装网关,它从消息队列中提取消息。

    @Autowired
    WrappingGateway integrationGateway;
    
    @Bean
    public IntegrationFlow createCatalogueChannelFlow() {
        return IntegrationFlows.from(cataloguePriorityChannel())
    
                // Queue Poller
                .bridge(s -> s.poller(Pollers.fixedRate(1, TimeUnit.SECONDS).maxMessagesPerPoll(1)).autoStartup(true)
                        .id("cataloguePriorityChannelBridge"))
    
                // Call to Gateway 
                .handle(m -> {
                    this.integrationGateway
                            .executeIntegration(((TrackingLog) m.getPayload()));
                })
    
                .get();
    }
    
    @Bean
    public IntegrationFlow startCatalogueIntegrationChannelFlow() {
        return IntegrationFlows.from(integrationChannel())
    
                // Log
                .handle(trackerSupportClient, "logMessagePreExecution")
    
                // Set TrackingLog in message Header
                .enrichHeaders(e -> e.headerFunction("TRACKING_LOG", m -> {
                    return ((TrackingLog) m.getPayload());
                }))
     ....
    

    整个集成稍微复杂一点,它从一个异步HTTP网关、变压器、路由器、mongodb中的存储等开始,这里的重点是正如@Artem Bilan建议我的那样,对网关的调用阻塞了线程,并阻止队列轮询器获得更多消息,直到当前消息被完全处理。

    希望这对你有帮助。

  • 共有1个答案

    唐照
    2023-03-14

    那确实是个有趣的任务...我会和你分享我的想法,你会选择最适合你的。

    >

  • 我们总是可以将一部分流包装成@messaginggateway,它应该等待回复。它的子流有多异步已经无关紧要了。因此,您可以并行地执行这些任务,但gateway仍然会在主线程中等待回复,从而阻止队列中的下一轮投票。您应该确保在子流的末尾返回一些内容到replychannel中,以解除对主线程的阻塞。参见此处的文档:https://docs.spring.io/spring-integration/docs/5.3.0.m4/reference/html/messaging-endpoints.html#网关

    我们有一个现成的barriermessagehandler组件。它的重点确实是用一个消息阻塞当前线程,直到某个触发器到达消息所属的关联。这个组件的问题是,您需要弄清楚如何为第一条消息释放屏障,因为这条消息将作为下一条消息的触发器。虽然我们可以使用一次性路由器来绕过第一条消息的障碍...文档在这里:https://Docs.spring.io/spring-integration/Docs/5.3.0.m4/reference/html/message-routing.html#barrier

    我们有一个像MessageSourcePollingTemplate这样的组件。因此,您可以在需要时调用包装到messageSourcelambda中的queuechannel。我现在有点想不出如何将它放入一个流中,但这又是一个如何暂停轮询的想法。参见文档:https://docs.spring.io/spring-integration/docs/5.3.0.m4/reference/html/core.html#deferred-acks-message-source

    另一种方法是将MethodInterceptor添加到Poller配置中,以跳过调用调用。如果某些AtomicBoolean的状态为True则proceed()。这样,您就会保持一个状态,直到消息处理完毕,并且每个轮询任务都将跳过,直到您重置该状态。文档:https://docs.spring.io/spring-integration/docs/5.3.0.m4/reference/html/messaging-endpoints.html#endpoint-pollingconsumer

  •  类似资料:
    • 我正在尝试配置一个轮询器,该轮询器每X秒查询一个bean,以将列表放入一个通道。该通道有一个下游流,该流将列表拆分并输出到发布/子通道(进一步的异步流)。我如何确保在任何给定的时间,只有在流执行过程中,轮询器必须等待/阻止流完成,直到为下一次轮询做好准备(固定速率/延迟)? ...在上进一步异步流以发送出站消息 有没有使用异步切换的阻塞轮询器示例,以及使用barrier向轮询器线程发送完成流信号的

    • 如何使用java dsl Integrationflows从spring集成触发spring批处理作业。 我有下面的代码,它轮询目录中的文件,当新文件添加到目录中时,会生成一条消息,我想在该实例中触发一个Spring批处理作业。请建议。

    • 需求:构建一个基于。NET的应用程序,该应用程序可以定期从IBM Websphere消息队列读取消息,并将这些消息保存到数据库中

    • 如何在下面的JUnit类中运行integrationFlow?目前出现了例外情况 因为整合流没有启动。 JUnit类: }

    • 我正在努力寻找一个成熟的例子,说明如何在Spring Boot框架中使用ApacheCamel进行轮询。 我已经看过了:https://camel.apache.org/manual/latest/polling-consumer.html除此之外:https://camel.apache.org/components/latest/timer-component.html但是代码示例不够广泛,我

    • 我正在开发一个使用Spring Integration 5.0.1和Spring Boot 2.0.0的应用程序。RC1 目前,应用程序响应并运行一些可能需要一段时间才能完成的初始化代码。这不使用任何Spring集成组件。 我还有一些非常基本的集成流,使用JavaDSL编写,并在配置中声明为bean。 有什么方法可以推迟流何时开始消耗消息吗?我希望能够在初始化完成时手动启动它们。 配置似乎是解决方