当前位置: 首页 > 知识库问答 >
问题:

头聚合期间,错误通道和回复通道消失

薄烨
2023-03-14

我有以下工作流程。

>

  • 入站通道

    分流器

    分割通道的任务执行器-所有线程执行相同的工作流。

    3.a .构建请求

    3.b .网关消息endpoint的服务激活器包装。

    3 . c . http-outbound-gateway上带有错误通道配置的网关包装器(用于在调用http-outbound-gateway时处理异常)

    超文本传输协议-出站-网关

    聚合器

    spring集成工作流的响应。

    如果在 3.d 中发生异常,则控制将转到为网关错误通道配置的服务激活器。我只将失败消息中的以下内容复制到新标头,然后复制到传递到错误通道的标头。

    a. 相关性 Id b. 序列编号 c. 序列大小

    但是在聚合拆分器响应时,DefaultAggregatingMessageGroupProcessor.java删除冲突的标头,并在向聚合器提供控制之前删除错误通道和回复通道。

    因此,一旦聚合器完成其操作,它就无法找到回复或错误通道,并导致异常。

    我使用的是SpringIntegrationCore版本2.2.1,我不确定为什么在头聚合期间删除了回复通道和错误通道。

    任何关于解决这个问题的意见都会有很大的帮助。

    谢谢你:)

    编辑1:非常感谢加里帮助我解决这种情况。我正在共享我的当前配置

    <!-- SPLITTER -->
    <int:splitter id="dentalSplitter" ref="dentalServiceSplitter"
        method="getDentalServiceList" input-channel="dentalServiceSplitterChannel"
        output-channel="dentalSplitterTaskChannel" />
    
    <int:channel id="dentalSplitterTaskChannel">
        <int:dispatcher task-executor="dentalTaskExecutor" />
    </int:channel>
    
    <int:chain input-channel="dentalSplitterTaskChannel" output-channel="dentalGatewayChannel">
        <int:header-enricher>
            <int:header name="CHAIN_START_TIME" expression="T(System).currentTimeMillis()" overwrite="true" />
        <int:object-to-json-transformer content-type="application/json"/>               
    </int:chain>
    
    <int:service-activator input-channel="dentalGatewayChannel" ref="dentalGatewayWrapper" output-channel="dentalReplyChannel" />
    <int:gateway id="dentalGatewayWrapper" default-request-channel="dentalCostEstimateRequestChannel" error-channel="dentalErrorChannel"/>
    
    <int-http:outbound-gateway id="dentalGateway"
        url-expression="@urlBuilder.build('${service.endpoint}')"  
        http-method="POST" request-factory="clientHttpRequestFactory"  
        request-channel="dentalCostEstimateRequestChannel" extract-request-payload="true" 
        expected-response-type="com.dental.test.DentalResponse">
        <int-http:request-handler-advice-chain> 
             <ref bean="logChainTimeInterceptor" /> 
        </int-http:request-handler-advice-chain>
    </int-http:outbound-gateway>
    
    <!-- EXCEPTION -->
    <int:chain input-channel="dentalErrorChannel" output-channel="dentalAggregatorChannel">
        <int:transformer ref="commonErrorTransformer" method="dentalGracefulReturn"/>
    </int:chain>
    
    <!-- SUCCESS -->
    <int:chain input-channel="dentalReplyChannel" output-channel="dentalAggregatorChannel">
            <int:filter discard-channel="dentalErrorChannel"
            expression="T(com.dental.util.InvocationOutcomeHelper).isOutcomeSuccess(payload?.metadata?.outcome?.code,payload?.metadata?.outcome?.message)" />
    </int:chain>
    
    <!-- AGGREGATION -->
    
    <int:chain input-channel="dentalAggregatorChannel" output-channel="wsDentalServiceOutputChannel" >
        <int:aggregator ref="dentalServiceAggregator" />
        <int:service-activator ref="dentalResponseServiceActivator" />
    </int:chain>
    

    我注意到的是,每个分离通道在通过网关时都会为错误和回复创建一个新的临时通道,在从网关获得响应后,它会保留保留的(原始入站)错误和回复通道头。正如您所提到的,在控制到达错误转换器后,保留保留头的流程被中断,聚合消息组处理器接收临时通道的三个不同实例,并因此删除它们。我计划有一个定制的消息组处理器,并修改聚合消息头的冲突解决策略,并提出了这个配置。

    <bean id="channelPreservingAggregatingMessageHandler" class="org.springframework.integration.aggregator.AggregatingMessageHandler">
        <constructor-arg name="processor" ref="channelPreservingMessageGroupProcessor"/>
    </bean>
    

    不过,我还没有测试过这一点。但基于这种讨论,这似乎不是一个可行的解决方案。

    看起来我在网关中的错误处理配置不正确。但是,我对您的这种说法感到困惑“不直接转发消息,而只是处理您的错误流上的错误,并正常地将结果返回给网关“包装器”。如果我移除了错误通道,当异常发生时,我如何取回控制权?也许我没明白这里的一些事情。你能详细说明一下吗?

  • 共有1个答案

    拓拔嘉颖
    2023-03-14

    当询问此类场景时,您通常需要显示您的配置。但是,我怀疑您正在将错误流中的消息直接转发到聚合器。

    这就像在代码中执行< code>GOTO并破坏作用域。

    它不起作用,因为错误消息中的< code>replyChannel头是用于网关“包装器”的,而不是原始的上游入站网关。当聚合器得到冲突的头时,它别无选择,只能丢弃这些头(您将看到一条调试日志消息)。

    与其直接转发消息,不如简单地处理错误流上的错误并将结果正常返回到网关“包装器”(只需省略错误流最后一个元素上的错误通道)。

    然后,网关将修改回复,使其与其他消息(好的和坏的)保持一致,并将其转发给聚合器。

    您不需要弄乱错误流中的标题,只需返回您希望与好结果一起聚合的值。

    您应该更新到当前版本,或者至少是2.2.x系列中的最新版本(2.2.6)。

     类似资料:
    • 问题内容: 当我在Go中编写函数时,它应该返回一个值和一个错误,例如 我想在goroutine中执行此createHashedPassword,我想通过通道传递数据。 但是我的问题是,如何在此处或goroutine中处理错误? 问题答案: 通常将多个输出捆绑到一个结构中,然后通过一个通道将它们一起返回。

    • 我正在用Spring-amqp和Spring-Rabbit进行多项测试。我的Maven父是Spring引导启动父:1.2.3。释放拉: 在某一点上,其中一个测试失败了,出现了这个错误,我看不出原因: 如果我改为SpringBootStarter父级:1.3.1。发布所有测试都通过了。 在不同的版本中挖掘,似乎我仍然可以用 但是所有的测试都通过了 1.5.0之间是否有任何相关变更。M1和1.5.0。

    • 我写了一个带有请求和回复的简单消息流。我必须使用两个独立的队列,所以我声明AmqpOutboundAdapter发送消息,声明Amqp入站Adapter接收回复。 它应该适用于@MessagingGateway: ADUsersFindResponseConfig类类似于: 发送消息正常工作,但我在接收消息时遇到问题。我希望收到的消息将被传递到名为FIND_AD_USERS_REPLY_OUTPU

    • 问题内容: 我有一个Node.js Web服务器,该服务器在顶部运行套接字服务器,该服务器是使用Socket.io创建的。基本上,这可行。 我现在想要实现的是将连接的客户端按组进行群集。所以可能有一些客户从而弥补了A组和其他一些客户从而弥补了B组,他们将选择哪个组由他们一adressing特定的URL,无论是属于或。 现在,在Socket.io中,我希望将消息发送到A组中的所有客户端或B组中的所有

    • 我正在通过绑定到不同Webshpere MQ的入站和出站原子和JMS使用带有JTA支持的Spring集成。流程如下: JMS入站通道适配器收到消息 一些转变 输出队列的JMS出站通道适配器 发生错误时,收到消息 异常类型路由器将未处理的错误路由到自定义重新抛出服务,并将处理的错误路由到接收者列表路由器,该路由器将它们发送到2个错误队列 我的问题是,即使消息到达errorChannel下游(在已处理

    • 我们正试图从谷歌驱动器获得推送通知。我们正在开发一个有几个用户的web平台。 我们需要接收任何用户的变化在一个网络钩子。根据文档,不清楚如何构建这些通道及其生命周期。 一旦,我的意思是,它被创建,平台将“永远”收到通知(直到信道被显式删除)? 或者我们需要在每次希望监视更改时创建一个通道(注意:无论用户是否登录到我们的平台中,我们都希望随时监视更改)。我们希望永远接收通知。 那我们该如何处理频道呢