我在应用程序中使用拆分器聚合器模式。我有以下配置-
<int:splitter inputChannel="CH1" outputChannel="CH2" ref="foo" method="bar"/>
<int:aggregator inputChannel="CH2" outputChannel="CH3" ref="oof" method="rab" ---other required configurations/>
<task:scheduler --with threadpool of size 30 />
<!--there are other channels as well. Some are queue and some are direct channels-->
我的所有通道(CH1、CH2、CH3)都是QueueChannel
。Splitter输入通道CH1的源代码是一个文件。
在我的测试中,我观察到即使在CH1通道中添加两个文件,在给定时间也只有一个文件被处理。所以我在我的CH1通道中添加了一个轮询器,现在正在同时处理CH1通道上的多个输入消息。
在聚合器方面,我也注意到执行总是单线程的,即直到第一个线程完成执行,第二个线程才开始执行。
到目前为止,我试图解决这个问题-
有人能帮我把聚合器处理成多线程的吗。
Update1我想做什么:
我接收包含库存数据的文件。我为每个文件创建消息并将其放入CH1通道。CH1通道有一个拆分器,可以将其拆分为单个记录并将其发送到CH2通道。CH2通道有一个与之关联的聚合器,它将尝试根据文件名聚合记录,并“oof.rab”将记录写入数据库。我的期望是看到同一文件的多批次同时写入数据库。
Update2:这里进行聚合以批处理行,以便我们可以以批处理模式将其写入数据库,而不是单独写入行
聚合器是多线程的,但仅适用于不同的消息组(相关性)。每个组都必须是单线程的,因为对于每个消息到达,我们都必须咨询ReleaseStrategy。
我想使用聚合器从两条消息中创建一条消息,但我不知道如何做到这一点。 目前,我正在从一个目录中读取两个文件,并希望将这些消息聚合为一个。 我的整个项目是这样的: 读入。拉链- 如果我可以在解压缩文件后发送一条包含两个有效负载的消息,那就太好了,但在读取后聚合就足够了。 我的拉链看起来像这样: 它将这些文件放入两个目录中,我使用FileReadingMessageSource再次从中读取它们。我还想只
目前,我正在与spring integration合作开发新的应用程序,并启动了poc,以了解如何处理故障案例。在我的应用程序中,spring integration将接收来自IBM mq的消息,并根据消息类型验证头信息和到不同队列的路由。传入的消息可能是批量消息,所以我使用了spring integration的拆分器和聚合器,并且对技术工作流程有很好的进展和控制。目前我面临的问题很少,我们有I
我有以下应用程序要求: 从RabbitMq接收消息,然后根据一些更复杂的规则进行聚合,例如基于属性(具有预先给定的类型时间映射)和基于消息在队列中等待的现有时间(属性) 正如您在图中看到的一个用例:三条消息已经聚合并等待下一秒发布(因为当前速率为),但就在那时,以到达,并更新了,使其成为优先级最高的第一条消息。因此,在下一个选项中,我们不再发布聚合3,而是发布聚合2,因为它现在具有更高的优先级。
我一直在使用Spring boot,并且在我的项目中删除了所有文件。不幸的是,它还使用了Spring集成,根据我的经验,Spring集成非常基于。 我有一个场景,要求我有一个聚合器,并让聚合器每隔秒轮询一次。 这可以像这样使用XML来完成(从以前的SO问题中获得的示例): 我已经找到了一个类,这个类有点像是在玩把戏,它的bean定义是: 但是,只有在与此处理程序关联的中接收到新消息时,才触发的方法
在我的用例中,最简单的集成组件安排是什么: 接收来自多个来源和多种格式的消息(所有消息都是JSON序列化对象)。 将消息存储在缓冲区中最多10秒(聚合) 通过不同的类属性getter(例如class1.someId(),class2.otherId(),...) 释放所有分组的消息并转换为新的聚合消息。 到目前为止(第1点和第2点),我正在使用聚合器,但不知道3)处的问题是否有现成的解决方案或者我
我们的应用程序中存在以下问题。消息通过入站通道适配器传入,并使用持久消息存储在聚合器中累积。一旦释放策略中定义的条件返回true,消息将被发送到处理的下一阶段。如果在下一个处理阶段抛出异常,事务将回滚,消息将再次放入持久消息存储中。但是,事务不会将消息放回原始队列,因为消息一旦放在聚合器中就会被确认。这不是我们想要的。理想情况下,如果在处理聚合器已批处理的其中一条下游消息时发生异常,则事务只会回滚