当前位置: 首页 > 知识库问答 >
问题:

Spring集成轮询聚合器(以编程方式)

郝君博
2023-03-14

我一直在使用Spring boot,并且在我的项目中删除了所有XML文件。不幸的是,它还使用了Spring集成,根据我的经验,Spring集成非常基于XML

我有一个场景,要求我有一个聚合器,并让聚合器每隔x秒轮询一次。

这可以像这样使用XML来完成(从以前的SO问题中获得的示例):

<!-- 
    the poller will process 100 messages every minute 
    if the size of the group is 100 (the poll reached the max messages) or 60 seconds time out (poll has less than 100 messages) then the payload with the list of messages is passed to defined output channel
-->
<int:aggregator input-channel="logEntryChannel" output-channel="logEntryAggrChannel"
    send-partial-result-on-expiry="true"
    group-timeout="60000"
    correlation-strategy-expression="T(Thread).currentThread().id"
    release-strategy-expression="size() == 100">
    <int:poller max-messages-per-poll="100" fixed-rate="60000"/>
</int:aggregator>

我已经找到了一个类,这个类有点像是在玩把戏,它的bean定义是:

@Bean(name = "aggregatingMessageHandler")
public AggregatingMessageHandler aggregatingMessageHandler() {

    AggregatingMessageHandler aggregatingMessageHandler =
            new AggregatingMessageHandler(messageGroupProcessorBean(),
                    new SimpleMessageStore(10));

 aggregatingMessageHandler.setCorrelationStrategy(customCorrelationStrategyBean());

    aggregatingMessageHandler.setReleaseStrategy(
            new TimeoutCountSequenceSizeReleaseStrategy(3,
                    TimeoutCountSequenceSizeReleaseStrategy.DEFAULT_TIMEOUT));

    aggregatingMessageHandler.setExpireGroupsUponCompletion(true);

    aggregatingMessageHandler.setOutputChannel(outputAggregatedChannelBean());

    return aggregatingMessageHandler;
}

但是,只有在与此处理程序关联的inboundChannel中接收到新消息时,才触发ReleaseStrategycanRelease()方法,而不是在固定的时间间隔内(这不是期望的结果)。我希望将超过一分钟的所有组重定向到输出通道。我的问题是——有没有一种方法可以通过编程方式附加一个轮询器,比如XML定义中的轮询器?

共有1个答案

仲涵亮
2023-03-14

对于Java

Aggregator组件具有AggregatorFactoryBean,以便更容易Java配置。

无论如何,您必须注意,在处理程序定义上有一个@ServiceActivator注释和一个@Bean。而且,@ServiceActivator具有轮询器属性。

还要注意,Spring集成有JavaDSL。

你问题的另一部分是有点困惑。轮询器完全与发布策略无关。在这种情况下,它负责从PollableChannel接收消息,即logEntryChannel。并且只有在这之后,已经轮询的消息才会被放置到聚合器中,以便进行关联和发布逻辑。

该示例中所做的是完全不同的故事,我们可以在单独的SO线程中讨论它。

 类似资料:
  • 我想使用聚合器从两条消息中创建一条消息,但我不知道如何做到这一点。 目前,我正在从一个目录中读取两个文件,并希望将这些消息聚合为一个。 我的整个项目是这样的: 读入。拉链- 如果我可以在解压缩文件后发送一条包含两个有效负载的消息,那就太好了,但在读取后聚合就足够了。 我的拉链看起来像这样: 它将这些文件放入两个目录中,我使用FileReadingMessageSource再次从中读取它们。我还想只

  • 我正在通过下面的链接来找出组合和聚合之间的区别。 https://www . geeks forgeeks . org/association-composition-aggregation-Java/ 我能够理解,组合意味着一种关系,其中子项不能独立于父项而存在,而聚合意味着子项可以独立于父项存在的关系。但是无法理解我如何以编程方式区分它.下面是链接中给出的聚合和组合示例。在这两种情况下,类在结

  • 目前,我正在与spring integration合作开发新的应用程序,并启动了poc,以了解如何处理故障案例。在我的应用程序中,spring integration将接收来自IBM mq的消息,并根据消息类型验证头信息和到不同队列的路由。传入的消息可能是批量消息,所以我使用了spring integration的拆分器和聚合器,并且对技术工作流程有很好的进展和控制。目前我面临的问题很少,我们有I

  • 我有以下应用程序要求: 从RabbitMq接收消息,然后根据一些更复杂的规则进行聚合,例如基于属性(具有预先给定的类型时间映射)和基于消息在队列中等待的现有时间(属性) 正如您在图中看到的一个用例:三条消息已经聚合并等待下一秒发布(因为当前速率为),但就在那时,以到达,并更新了,使其成为优先级最高的第一条消息。因此,在下一个选项中,我们不再发布聚合3,而是发布聚合2,因为它现在具有更高的优先级。

  • 我在应用程序中使用拆分器聚合器模式。我有以下配置- 我的所有通道(CH1、CH2、CH3)都是。Splitter输入通道CH1的源代码是一个文件。 在我的测试中,我观察到即使在CH1通道中添加两个文件,在给定时间也只有一个文件被处理。所以我在我的CH1通道中添加了一个轮询器,现在正在同时处理CH1通道上的多个输入消息。 在聚合器方面,我也注意到执行总是单线程的,即直到第一个线程完成执行,第二个线程

  • 我有以下XML负载,我正试图将其用于Spring集成和Spring集成AMQP: 我正在使用xpath拆分器拆分消息: 我工作正常,消息被分成3条新消息,例如使用此有效负载: 在此步骤之后,将使用此设置聚合消息: 作为最后一步,消息将使用此出站通道适配器发送到交换机: 不幸的是,出现了一些问题,因为我最终得到了这样的有效载荷。我需要它保持XML格式。