我正在尝试使用Spring集成配置以下内容:
到目前为止,我有一些问题。。。
>
我使用发布订阅通道来设置apply sequence=“true”
属性,以便correlationId、sequenceSize
sequenceSize 属性仅设置为 1,即使在扇出交换中注册了 2 个队列也是如此。据推测,这意味着消息将过早地从聚合器中释放出来。我希望这是因为我滥用了发布-订阅通道来使用 apply-sequence=“true”
,并且非常正确地说只有一个订阅者,int-amqp:outbound-gateway
。
我的出站Spring配置如下:
<int:publish-subscribe-channel id="output" apply-sequence="true"/>
<int:channel id="reply">
<int:interceptors>
<int:wire-tap channel="logger"/>
</int:interceptors>
</int:channel>
<int:aggregator input-channel="reply" method="combine">
<bean class="example.SimpleAggregator"/>
</int:aggregator>
<int:logging-channel-adapter id="logger" level="INFO"/>
<int:gateway id="senderGateway" service-interface="example.SenderGateway" default-request-channel="output" default-reply-channel="reply"/>
<int-amqp:outbound-gateway request-channel="output"
amqp-template="amqpTemplate" exchange-name="fanout-exchange"
reply-channel="reply"/>
我的 rabbitMQ 配置如下:
<rabbit:connection-factory id="connectionFactory" />
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" reply-timeout="-1" />
<rabbit:admin connection-factory="connectionFactory" />
<rabbit:queue name="a-queue"/>
<rabbit:queue name="b-queue"/>
<rabbit:fanout-exchange name="fanout-exchange">
<rabbit:bindings>
<rabbit:binding queue="a-queue" />
<rabbit:binding queue="b-queue" />
</rabbit:bindings>
</rabbit:fanout-exchange>
消费者看起来像这样:
<int:channel id="input"/>
<int-amqp:inbound-gateway request-channel="input" queue-names="a-queue" connection-factory="connectionFactory" concurrent-consumers="1"/>
<bean id="listenerService" class="example.ListenerService"/>
<int:service-activator input-channel="input" ref="listenerService" method="receiveMessage"/>
任何建议都很好,我怀疑我在某个地方拿错了棍子的一端…
基于 Gary 评论的新出站 Spring 配置:
<int:channel id="output"/>
<int:header-enricher input-channel="output" output-channel="output">
<int:correlation-id expression="headers['id']" />
</int:header-enricher>
<int:gateway id="senderGateway" service-interface="example.SenderGateway" default-request-channel="output" default-reply-timeout="5000" default-reply-channel="reply" />
<int-amqp:outbound-gateway request-channel="output"
amqp-template="amqpTemplate" exchange-name="fanout-exchange"
reply-channel="reply"
mapped-reply-headers="amqp*,correlationId" mapped-request-headers="amqp*,correlationId"/>
<int:channel id="reply"/>
<int:aggregator input-channel="reply" output-channel="reply" method="combine" release-strategy-expression="size() == 2">
<bean class="example.SimpleAggregator"/>
</int:aggregator>
尽管这个问题已经有 3 年的历史了,但我会回答它,因为我有同样的问题。
Spring Integrathtml" target="_blank">ion有一个Scatter-Gather的实现,听起来很像你最初的问题。
下面是Spring文档中的相关部分
它是一个复合endpoint,目标是向接收者发送消息并聚合结果……
以前,可以使用分立元件配置模式,此增强功能带来了更方便的配置。
ScatterGatherHandler是一个请求-答复endpoint,它结合了PublishSubscribeChannel(或RecipientListRouter)和AggregatingMessageHandler。请求消息被发送到分散通道,而ScatterGatherHandler等待来自聚合器的回复,以发送到输出通道。
问题是S. I.不知道扇出交换的拓扑。
最简单的解决方法是使用自定义发布策略
release-strategy-expression="size() == 2"
在聚合器上(假设扇出为2)。所以,你不需要序列大小;您可以使用标题丰富器来避免“滥用”发布/订阅通道...
<int:header-enricher input-channel="foo" output-channel="bar">
<int:correlation-id expression="T(java.util.UUID).randomUUID().toString()" />
</int:header-enricher>
您可以使用消息 ID 来避免创建新的 UUID,该消息 ID 已经是唯一的...
<int:correlation-id expression="headers['id']" />
最后,您可以通过添加
mapped-request-headers="correlationId"
到您的amqpendpoint。
我想使用聚合器从两条消息中创建一条消息,但我不知道如何做到这一点。 目前,我正在从一个目录中读取两个文件,并希望将这些消息聚合为一个。 我的整个项目是这样的: 读入。拉链- 如果我可以在解压缩文件后发送一条包含两个有效负载的消息,那就太好了,但在读取后聚合就足够了。 我的拉链看起来像这样: 它将这些文件放入两个目录中,我使用FileReadingMessageSource再次从中读取它们。我还想只
在我的用例中,最简单的集成组件安排是什么: 接收来自多个来源和多种格式的消息(所有消息都是JSON序列化对象)。 将消息存储在缓冲区中最多10秒(聚合) 通过不同的类属性getter(例如class1.someId(),class2.otherId(),...) 释放所有分组的消息并转换为新的聚合消息。 到目前为止(第1点和第2点),我正在使用聚合器,但不知道3)处的问题是否有现成的解决方案或者我
我一直在尝试使用RabbitMQ,但遇到了以下问题(与此非常类似:RabbitMQ中的主题交换与直接交换)。 我需要密集地广播大约800种类型的消息(因此每种消息类型都会有很多消费者),我想知道以下哪种方法更好: > 创建一个直接交换,在该交换中,消息将使用路由密钥(消息类型名称)发送,每个消费者都将通过绑定了相应路由密钥的临时队列连接到该交换。(因为没有像“key1.key2.*”这样复杂的路由
我能够使用Publish/SubscribeRabbitMQ Java教程创建扇出交换,任何连接的使用者都将收到一个消息的副本。我想在连接任何使用者之前创建交换和绑定,而不是动态/编程地声明交换和绑定。我已经通过RabbitMQ管理控制台完成了这一点。然而,由于某种原因,我的消费者以循环方式接收消息,而不是全部接收消息的副本。我错过了什么?下面是一些代码片段: 发布者: 消费者: ...在Rabb
这是我第一次在Java7下通过DSL配置Spring集成。因为我们知道Lambda表达式只在Java8下工作。所以我参考了Spring集成JavaDSL和Spring集成JavaDSL(Java8):逐行教程的例子,使我的配置如下收集相同资源的每100条消息发送到远程RESTful服务。 然而,配置对我不起作用,它会引发如下异常。 根本原因在o.s.i.u.MessagingMethodInvok
我有以下应用程序要求: 从RabbitMq接收消息,然后根据一些更复杂的规则进行聚合,例如基于属性(具有预先给定的类型时间映射)和基于消息在队列中等待的现有时间(属性) 正如您在图中看到的一个用例:三条消息已经聚合并等待下一秒发布(因为当前速率为),但就在那时,以到达,并更新了,使其成为优先级最高的第一条消息。因此,在下一个选项中,我们不再发布聚合3,而是发布聚合2,因为它现在具有更高的优先级。