当前位置: 首页 > 知识库问答 >
问题:

spring集成:多步多通道订户

左丘恩
2023-03-14

我需要实现一个由多个步骤组成的集成流程,每个步骤都可以由不同数量的处理器(插件)执行。

到目前为止我所拥有的:

<!-- gateway -->
<int:gateway default-request-channel="step/1" service-interface="ServiceGateway">
    <int:method name="send" />
</int:gateway>


<!-- plugin 1 -->
<int:publish-subscribe-channel id="step/1" apply-sequence="true" />

<int:service-activator input-channel="step/1" output-channel="step/2">
    <bean class="Transformer" />
</int:service-activator>

<int:service-activator input-channel="step/1" output-channel="step/2">
    <bean class="Transformer2" />
</int:service-activator>


<!-- plugin 2 -->
<int:publish-subscribe-channel id="step/2" apply-sequence="true" />

<int:service-activator input-channel="step/2" output-channel="end">
    <bean class="Transformer3" />
</int:service-activator>

<int:service-activator input-channel="step/2" output-channel="end">
    <bean class="HttpTransformer4" />
</int:service-activator>

<!-- aggregation -->
<int:channel id="end" />
<int:aggregator input-channel="end" />

预期的行为如下:

  1. 通过网关发送第一个请求

一切正常,但结果不是预期的,我只收到2个(随机)项目,而不是4个。

我认为问题在于聚合器仅在两个项目之后触发发布,因为“step/2”通道中的“apply sequence”覆盖了“step/1”中的“apply sequence”。所以问题是:如何让聚合器等待所有消息?

提前谢谢你。

定制发布策略:

@SuppressWarnings("unchecked")
@Override
public boolean canRelease ( MessageGroup group ) {

    MessageHeaders headers = group.getOne ().getHeaders ();
    List<List<Object>> sequenceDetails = (List<List<Object>>) headers.get ( "sequenceDetails" );
    System.out.println ( sequenceDetails );

    int expectedSize = 1;
    //map message id, max group size reached (i.e. sequenceNumber==sequenceSize)
    for ( List<Object> sequenceItem : sequenceDetails ) {
        if ( sequenceItem.get ( 1 ) != sequenceItem.get ( 2 ) ) {
            System.err.println ( "--> AGG: no release check, group max not reached" );
            return false;
        }
        expectedSize *= (int) sequenceItem.get ( 2 );//multiplies the group sizes
    }

    int expectedSize2 = expectedSize * (int) headers.get ( "sequenceSize" );

    int currentSize = group.getMessages ().size () * expectedSize;
    System.err.println ( "--> AGG: " + expectedSize2 + " : " + currentSize );
    boolean canRelease = expectedSize2 == currentSize;
    if ( canRelease ) {
        System.out.println ( "ok" );
    }
    return canRelease;
}

打印出来:

[[7099b583-55d4-87d3-4502-993f05bfb388,1,2]]

--

[[7099b583-55d4-87d3-4502-993f05bfb388,1,2]]

--

[[7099b583-55d4-87d3-4502-993f05bfb388,2,2]]

--

[[7099b583-55d4-87d3-4502-993f05bfb388,2,2]]

--

聚合代码:

@Aggregator
public Object aggregate ( List<Message<?>> objects ) {

    List<Object> res = new ArrayList<> ();
    for ( Message<?> m : objects ) {
        res.add ( m.getPayload () );
        MessageHeaders headers2 = m.getHeaders ();
        System.out.println ( headers2.get ( "history" ) );
    }

    return res;
}

打印出来:

网关2,核心通道,(内部bean)#57018165,异步/步骤/1,核心通道,(内部bean)#57018165,异步/步骤/2,核心通道,(内部bean)#57018165,end2

网关2,核心通道,(内部bean)#57018165,异步/步骤/1,核心通道,(内部bean)#57018165,异步/步骤/2,核心通道,(内部bean)#57018165,end2

[102, 202] --

共有1个答案

高锦
2023-03-14

使用自定义发布策略。第二个pubsub将来自第一个pubsub的相关数据推送到setence细节头中的堆栈上。

编辑

问题是有两个群体;您需要在初始correlationId上进行关联。这里有一个纯SpEL解决方案;使用自定义关联/发布策略来确保数据如预期的那样(并使用getOne()而不是迭代器)可能更安全。。。

<int:aggregator input-channel="end2"
        correlation-strategy-expression=
           "headers['sequenceDetails'][0][0]"
        release-strategy-expression=
           "size() == iterator().next().headers['sequenceSize'] * iterator().next().headers['sequenceDetails'][0][2]" />
 类似资料:
  • 如何在不使用XML的情况下将2个通道输出到具有Spring集成的单个通道。类似于以下问题多通道的消息进入单通道 在我的上下文中,我有2个PollableChannel bean,我希望将消息从这两个bean(非聚合)路由到一个@ServiceActivator,即完成如下操作:

  • 目前,我正在开发一个Spring集成应用程序,该应用程序具有以下场景。 有一个转换器可以将传入的消息转换为特定的对象类型 转换完成后,我们需要将其写入日志文件和数据库表,然后最终发送到JMS出站适配器。 null

  • 我在我们的项目中引入了spring集成,而不是遗留集成架构。该体系结构支持发送者和累犯。每个发件人可以配置3个目的地。 null Spring integration gateways看起来很合适。我可以使用default-request-channel来表示主流,error-channel来表示失败流。备份流的问题。如何复制网关传入消息并将其放置到备份通道? 更准确地说,这里是一个测试和代码。

  • 我是Spring集成的新手,正在研究一个从单个通道向多个通道发送消息的示例,从这个角度来看,为每个通道使用Redis消息存储,目的是不丢失任何消息。要求将消息发送到通道-replyChannel、mailChannel和dbChannel。目前,代码只打印sysout语句,没有主要功能。 为了检查消息是否被正确路由,我编写了一个java测试类来发送15条消息。 检查输出,我发现一些消息正在丢失。也

  • 我有一个基于DSL的流,它使用拆分迭代对象列表并发送Kafka消息: 在所有消息发出后,我需要调用服务,还需要记录处理了多少消息。我知道一种方法是使用publishSubscribeChannel,其中第一个subscribe执行实际的Kafka发送,然后聚合执行服务调用: 我在弄清楚如何使用DSL在pubSubChannel中实际执行部分时遇到了问题。到目前为止,我已经尝试过: 有什么指示吗?

  • 但我得到的错误如下: POM: 我按以下方式配置入站网关: 并且,服务激活器: 顺便说一句,只有当我在服务激活器中删除outputChannel="outputChannel"时,它才有效。 这个问题有什么解释吗,我有什么误解吗?