当前位置: 首页 > 知识库问答 >
问题:

配置Spring集成聚合器以组合来自RabbitMq扇出交换的响应

石正奇
2023-03-14

我正在尝试使用Spring集成配置以下内容:

  1. 向频道发送消息。
  2. 将此消息发布到与 n 个使用者的兔子扇出(发布/订阅)交换。
  3. 每个使用者都提供一条响应消息。
  4. 让 Spring 集成聚合这些响应,然后再将它们返回给原始客户端。

到目前为止,我有一些问题。。。

>

  • 我使用发布订阅通道来设置apply sequence=“true”属性,以便correlationId、sequenceSize

    sequenceSize 属性仅设置为 1,即使在扇出交换中注册了 2 个队列也是如此。据推测,这意味着消息将过早地从聚合器中释放出来。我希望这是因为我滥用了发布-订阅通道来使用 apply-sequence=“true”,并且非常正确地说只有一个订阅者,int-amqp:outbound-gateway

    我的出站Spring配置如下:

    <int:publish-subscribe-channel id="output" apply-sequence="true"/>
    
    <int:channel id="reply">
        <int:interceptors>
            <int:wire-tap channel="logger"/>
        </int:interceptors>
    </int:channel>
    
    <int:aggregator input-channel="reply" method="combine">
        <bean class="example.SimpleAggregator"/>
    </int:aggregator>
    
    <int:logging-channel-adapter id="logger" level="INFO"/>
    
    <int:gateway id="senderGateway" service-interface="example.SenderGateway" default-request-channel="output" default-reply-channel="reply"/>
    
    <int-amqp:outbound-gateway request-channel="output"
                                       amqp-template="amqpTemplate" exchange-name="fanout-exchange"
                                       reply-channel="reply"/>
    

    我的 rabbitMQ 配置如下:

    <rabbit:connection-factory id="connectionFactory" />
    
    <rabbit:template id="amqpTemplate" connection-factory="connectionFactory" reply-timeout="-1" />
    
    <rabbit:admin connection-factory="connectionFactory" />
    
    <rabbit:queue name="a-queue"/>
    <rabbit:queue name="b-queue"/>
    
    <rabbit:fanout-exchange name="fanout-exchange">
        <rabbit:bindings>
            <rabbit:binding queue="a-queue" />
            <rabbit:binding queue="b-queue" />
        </rabbit:bindings>
    </rabbit:fanout-exchange>
    

    消费者看起来像这样:

    <int:channel id="input"/>
    
    <int-amqp:inbound-gateway request-channel="input" queue-names="a-queue" connection-factory="connectionFactory" concurrent-consumers="1"/>
    
    <bean id="listenerService" class="example.ListenerService"/>
    
    <int:service-activator input-channel="input" ref="listenerService" method="receiveMessage"/>
    

    任何建议都很好,我怀疑我在某个地方拿错了棍子的一端…

    基于 Gary 评论的新出站 Spring 配置:

    <int:channel id="output"/>
    
    <int:header-enricher input-channel="output" output-channel="output">
        <int:correlation-id expression="headers['id']" />
    </int:header-enricher>
    
    <int:gateway id="senderGateway" service-interface="example.SenderGateway" default-request-channel="output" default-reply-timeout="5000" default-reply-channel="reply" />
    
    <int-amqp:outbound-gateway request-channel="output"
                                       amqp-template="amqpTemplate" exchange-name="fanout-exchange"
                                       reply-channel="reply"
                                       mapped-reply-headers="amqp*,correlationId" mapped-request-headers="amqp*,correlationId"/>
    
    <int:channel id="reply"/>
    
    <int:aggregator input-channel="reply" output-channel="reply" method="combine" release-strategy-expression="size() == 2">
        <bean class="example.SimpleAggregator"/>
    </int:aggregator>
    
  • 共有2个答案

    琴俊良
    2023-03-14

    尽管这个问题已经有 3 年的历史了,但我会回答它,因为我有同样的问题。

    Spring Integrathtml" target="_blank">ion有一个Scatter-Gather的实现,听起来很像你最初的问题。

    下面是Spring文档中的相关部分

    它是一个复合endpoint,目标是向接收者发送消息并聚合结果……

    以前,可以使用分立元件配置模式,此增强功能带来了更方便的配置。

    ScatterGatherHandler是一个请求-答复endpoint,它结合了PublishSubscribeChannel(或RecipientListRouter)和AggregatingMessageHandler。请求消息被发送到分散通道,而ScatterGatherHandler等待来自聚合器的回复,以发送到输出通道。

    万铭
    2023-03-14

    问题是S. I.不知道扇出交换的拓扑。

    最简单的解决方法是使用自定义发布策略

    release-strategy-expression="size() == 2"
    

    在聚合器上(假设扇出为2)。所以,你不需要序列大小;您可以使用标题丰富器来避免“滥用”发布/订阅通道...

        <int:header-enricher input-channel="foo" output-channel="bar">
            <int:correlation-id expression="T(java.util.UUID).randomUUID().toString()" />
        </int:header-enricher>
    

    您可以使用消息 ID 来避免创建新的 UUID,该消息 ID 已经是唯一的...

    <int:correlation-id expression="headers['id']" />
    

    最后,您可以通过添加

    mapped-request-headers="correlationId"
    

    到您的amqpendpoint。

     类似资料:
    • 我想使用聚合器从两条消息中创建一条消息,但我不知道如何做到这一点。 目前,我正在从一个目录中读取两个文件,并希望将这些消息聚合为一个。 我的整个项目是这样的: 读入。拉链- 如果我可以在解压缩文件后发送一条包含两个有效负载的消息,那就太好了,但在读取后聚合就足够了。 我的拉链看起来像这样: 它将这些文件放入两个目录中,我使用FileReadingMessageSource再次从中读取它们。我还想只

    • 在我的用例中,最简单的集成组件安排是什么: 接收来自多个来源和多种格式的消息(所有消息都是JSON序列化对象)。 将消息存储在缓冲区中最多10秒(聚合) 通过不同的类属性getter(例如class1.someId(),class2.otherId(),...) 释放所有分组的消息并转换为新的聚合消息。 到目前为止(第1点和第2点),我正在使用聚合器,但不知道3)处的问题是否有现成的解决方案或者我

    • 我一直在尝试使用RabbitMQ,但遇到了以下问题(与此非常类似:RabbitMQ中的主题交换与直接交换)。 我需要密集地广播大约800种类型的消息(因此每种消息类型都会有很多消费者),我想知道以下哪种方法更好: > 创建一个直接交换,在该交换中,消息将使用路由密钥(消息类型名称)发送,每个消费者都将通过绑定了相应路由密钥的临时队列连接到该交换。(因为没有像“key1.key2.*”这样复杂的路由

    • 我能够使用Publish/SubscribeRabbitMQ Java教程创建扇出交换,任何连接的使用者都将收到一个消息的副本。我想在连接任何使用者之前创建交换和绑定,而不是动态/编程地声明交换和绑定。我已经通过RabbitMQ管理控制台完成了这一点。然而,由于某种原因,我的消费者以循环方式接收消息,而不是全部接收消息的副本。我错过了什么?下面是一些代码片段: 发布者: 消费者: ...在Rabb

    • 这是我第一次在Java7下通过DSL配置Spring集成。因为我们知道Lambda表达式只在Java8下工作。所以我参考了Spring集成JavaDSL和Spring集成JavaDSL(Java8):逐行教程的例子,使我的配置如下收集相同资源的每100条消息发送到远程RESTful服务。 然而,配置对我不起作用,它会引发如下异常。 根本原因在o.s.i.u.MessagingMethodInvok

    • 我有以下应用程序要求: 从RabbitMq接收消息,然后根据一些更复杂的规则进行聚合,例如基于属性(具有预先给定的类型时间映射)和基于消息在队列中等待的现有时间(属性) 正如您在图中看到的一个用例:三条消息已经聚合并等待下一秒发布(因为当前速率为),但就在那时,以到达,并更新了,使其成为优先级最高的第一条消息。因此,在下一个选项中,我们不再发布聚合3,而是发布聚合2,因为它现在具有更高的优先级。