当前位置: 首页 > 知识库问答 >
问题:

spring集成-拆分器和聚合器

鲜于华容
2023-03-14

目前,我正在与spring integration合作开发新的应用程序,并启动了poc,以了解如何处理故障案例。在我的应用程序中,spring integration将接收来自IBM mq的消息,并根据消息类型验证头信息和到不同队列的路由。传入的消息可能是批量消息,所以我使用了spring integration的拆分器和聚合器,并且对技术工作流程有很好的进展和控制。目前我面临的问题很少,我们有IBM mq和webservice作为我们的网关。两个网关都接收消息并发送到拆分器通道,拆分器在其中拆分消息并发送到出站通道(执行器通道)。因此,消息将被并行发送到目的地,状态更新服务激活器将以相同的通道接收到order=2的消息,并发送到聚合器。因此,为了它的良好实施。

问题:如果jms出站网关抛出执行我已经添加了建议作为异常处理程序,它将发送到另一个服务激活器,以更新故障状态到DTO对象,并将具有相同的聚合器通道作为输出,但没有在聚合器通道中接收消息,在这种情况下,聚合器只接收快乐流。

我想聚合出站成功消息和失败消息(其他服务激活器更新状态),然后完整的状态需要发布到响应队列作为另一个出站或作为webservice中的响应。

我试图命令成功的服务激活器和故障错误处理程序服务激活器具有相同的通道,这是聚合器的输入通道,它不工作。

感谢您的指导,继续执行此工作流程

使用Spring集成2.2.2

<channel id="inbound"/>
<channel id="splitterInChannel"/>

<channel id="splitterOutChannel">
<dispatcher task-executor="splitterExecutor"/>
</channel>          
<beans:bean id="splitterExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
    <beans:property name="corePoolSize" value="5" />
    <beans:property name="maxPoolSize" value="10" />
    <beans:property name="queueCapacity" value="25" />
</beans:bean>
<channel id="ValidatorChannel"/>

<channel id="outBoundjmsChannel"/>
<channel id="outBoundErrorChannel"/>
<channel id="finalOutputChannel"></channel>
<channel id="aggregatorChannel"/>

<jms:inbound-channel-adapter connection-factory="AMQConnectionFactory" 
destination="AMQueue" channel="inbound" auto-startup="true" 
extract-payload="false" acknowledge="transacted"></jms:inbound-channel-adapter>


<service-activator ref="InBoundProcessor" input-channel="inbound" output-channel="splitterInChannel"></service-activator>

<!-- splitter -->
<splitter ref="Splitter" method="splitInput" input-channel="splitterInChannel" output-channel="splitterOutChannel"/>
<!-- validator -->
<service-activator ref="Validator" method="validate" input-channel="splitterOutChannel" output-channel="ValidatorChannel"/>         

<!-- need to add enricher -->
<service-activator ref="Enricher" method="enrich" input-channel="ValidatorChannel" output-channel="outBoundjmsChannel"/>            

<!-- outbound gateway -->

<jms:outbound-channel-adapter channel="outBoundjmsChannel" connection-factory="AMQConnectionFactory" destination-name="outputQueue" 
message-converter="customMessageConvertor" order="1"  >

        <jms:request-handler-advice-chain>
            <beans:bean class="org.springframework.integration.handler.advice.RequestHandlerRetryAdvice">
                <beans:property name="retryTemplate" >
                    <beans:bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate">
                        <beans:property name="retryPolicy">
                            <beans:bean class="org.springframework.retry.policy.SimpleRetryPolicy">
                            <beans:property name="maxAttempts" value="2" />
                            </beans:bean>
                        </beans:property>
                    <beans:property name="backOffPolicy">
                        <beans:bean class="org.springframework.retry.backoff.FixedBackOffPolicy">
                        <beans:property name="backOffPeriod" value="1000" />
                        </beans:bean>
                        </beans:property>
                    </beans:bean>            
                </beans:property>
                <beans:property name="recoveryCallback">
                    <beans:bean class="org.springframework.integration.handler.advice.ErrorMessageSendingRecoverer">
                        <beans:constructor-arg ref="outBoundErrorChannel" />
                    </beans:bean>
                </beans:property>

            </beans:bean>
        </jms:request-handler-advice-chain>
    </jms:outbound-channel-adapter>



<!-- outBound error processor -->           
<service-activator  ref="ErrorProcessor" method="errorHandling" input-channel="outBoundErrorChannel" output-channel="aggregatorChannel" />          
<!-- Post send processor -->
<service-activator  ref="PostProcessor" method="Postprocessing" input-channel="outBoundjmsChannel" output-channel="aggregatorChannel" order="2"/>           
<!-- aggregator -->
<aggregator ref="Aggregator" correlation-strategy-method="aggregateStrategy" input-channel="aggregatorChannel" output-channel="finalOutputChannel"
release-strategy-method="isRelease" method="aggregate" expire-groups-upon-completion="true"/>


<!-- final processor or responder -->
<service-activator ref="FinalProcessor" method="endProcessing" input-channel="finalOutputChannel"/>         

</beans:beans>

在上面的配置中,到目前为止,我已经将发布策略设置为false,将关联方法设置为空字符串。如果这样做有效,我将为该批生成UUID,并将在拆分器中将UUID附加到Corrate。

在调试上述配置时,我注意到出站错误通道receive每当试图发送到出站适配器时(在我的例子中是两次发送)。我不想在一个应用程序中重新尝试,而在另一个应用程序中,它需要尝试重新发布消息。在这两种情况下,我都希望在最终尝试聚合后将消息发送到出站错误通道,如果失败,我将在ErrorProcessor中将状态更新为failed to send。

两个问题。1.我接收到重复的消息到通道,很难识别最后的失败或成功。2.无法为发布策略制定逻辑,难以确定哪一个是重复的,以及它是否成功。

在上面的例子中,我找不到比较对象的通用方法,因为equals方法没有合适的属性进行比较,而且它也不是比较布尔字段的正确方法。

请帮助我解决这个问题,继续我的工作流程设计和完成。

非常感谢您指导我继续。谢谢你,克里斯·S

目前

public Object errorHandling(Object object){
        OutBoundMessage outBoundMessage = null;
        if(object instanceof MessagingException){

            outBoundMessage =((MessagingException) object).getFailedMessage();
        }else{
            //TODO: log the message
        }

        return outBoundMessage;
    }




    public String aggregateStrategy(OutBoundMessage outBoundMessage){

        //TODO: get the UUID from outbound message and return 
        return "";
    }




public List<OutBoundMessage> splitter(InBoundMessage inBoundMessage){

        String[] message = inBoundMessage.getRawMessage().split(",");
        long uuid = java.util.UUID.randomUUID().getLeastSignificantBits();
        List<OutBoundMessage> outBoundMessagelist = new ArrayList<OutBoundMessage>();
        for (String string : message) {
            OutBoundMessage outBoundMessage = new outBoundMessage();
            outBoundMessage.setCorrelationID(uuid);
            outBoundMessagelist.add(outBoundMessage);
        }

    }

在以下方法中添加为默认false以验证

public boolean isRelease(List<OutBoundMessage> outBoundMessage){
        //TODO: need to define condition for closing the list aggregation
        return false;
    }

共有2个答案

傅茂实
2023-03-14

我终于明白了它的工作原理,

在message convertor中,我将布尔值设置为true,在Errorhandle中,我将其设置为false并返回null,因此恢复是将消息作为失败消息接收到聚合器,并理解当我返回对象时会发生什么。感谢@ArtemBilan,您的代码块让我了解了正在发生的事情以及我应该做什么

董飞
2023-03-14

请分享您的错误处理器源代码。以及相关策略方法="聚合策略"

我想知道您如何处理那里的ErrorMessage,以及如何在ErrorProcessor之后从消息中恢复correlationKey

不确定如何构建自己的correlationKey,但是

对于您的ErrorMessageSendingRecoverer中的ErrorMessage,我可以建议注意那里的异常有效负载,它看起来像(来自ErrorMessageSendingRecoverer源代码):

else if (!(lastThrowable instanceof MessagingException)) {
        lastThrowable = new MessagingException((Message<?>) context.getAttribute("message"),
                lastThrowable.getMessage(), lastThrowable);
    }
....
messagingTemplate.send(new ErrorMessage(lastThrowable));

因此,MessaginException,有一条异常的“有罪”消息,而该消息正好有一个聚合器的适当的相关详细信息头。因此,如果希望将错误聚合到同一个消息组,则应该依赖它们。

 类似资料:
  • 使用Spring Integr中的拆分器,我拆分了从数据库中的表中选择的数据行。每条消息完成处理后,我想像旧消息一样将每条消息聚合到一条消息中。我该怎么办?我不知道拆分器拆分了多少条消息。我只知道拆分消息头中的相关ID。即使我聚合消息,我也无法制定发布策略。 我如何解决这个问题? 以及是否有任何方法可以使用jdbc-out站网关或jdbc-out站通道适配器一次插入多行数据,而无需使用拆分器插入每

  • 我想使用聚合器从两条消息中创建一条消息,但我不知道如何做到这一点。 目前,我正在从一个目录中读取两个文件,并希望将这些消息聚合为一个。 我的整个项目是这样的: 读入。拉链- 如果我可以在解压缩文件后发送一条包含两个有效负载的消息,那就太好了,但在读取后聚合就足够了。 我的拉链看起来像这样: 它将这些文件放入两个目录中,我使用FileReadingMessageSource再次从中读取它们。我还想只

  • 我有以下XML负载,我正试图将其用于Spring集成和Spring集成AMQP: 我正在使用xpath拆分器拆分消息: 我工作正常,消息被分成3条新消息,例如使用此有效负载: 在此步骤之后,将使用此设置聚合消息: 作为最后一步,消息将使用此出站通道适配器发送到交换机: 不幸的是,出现了一些问题,因为我最终得到了这样的有效载荷。我需要它保持XML格式。

  • 我正在尝试做一个GroupBy基于共享ID的GeoJSON功能列表,以便通过使用拆分/聚合来聚合这些功能的单个字段,如下所示: 除非我取消对这三行的注释,否则聚合器永远不会发布组,数据库也不会收到任何更新。如果我将groupTimeout设置为小于5秒,则会丢失部分结果。 我预计发布策略默认为,我预计在处理完所有(拆分)功能后会自动释放所有组(REST服务消息中总共只有129个功能)。手动将其设置

  • 我有一个用例,我的消息被拆分两次,我想聚合所有这些消息。如何才能最好地实现这一点,我应该通过引入不同的序列头来聚合消息两次,还是有办法通过重写消息分组的方法在单个聚合步骤中聚合消息?

  • 我有一个基于DSL的流,它使用拆分迭代对象列表并发送Kafka消息: 在所有消息发出后,我需要调用服务,还需要记录处理了多少消息。我知道一种方法是使用publishSubscribeChannel,其中第一个subscribe执行实际的Kafka发送,然后聚合执行服务调用: 我在弄清楚如何使用DSL在pubSubChannel中实际执行部分时遇到了问题。到目前为止,我已经尝试过: 有什么指示吗?