当前位置: 首页 > 知识库问答 >
问题:

如何通过spring integration Aggregator将一条消息添加到多个组中

岳毅
2023-03-14

嗨,我们正在尝试流式处理金融市场数据,通过利用apache camel或Spring集成来计算交易信号。我们的一个用例是根据价格时间戳将连续价格汇总在一起,如下所示:

  • 输入

输入消息以时间序列中的(时间戳、价格)对的形式出现。假设值为,每对(TX,PX)是一条消息,而T表示时间戳,P表示价格值

(T0,P1),(T1,P1),(T2,P2),(T3,P3),(T4,P4)... 
  • 聚合

假设我们需要将每3条连续消息聚合在一起进行进一步计算,给定需要生成以下组的输入消息,每个3对组都是聚合消息:

[(T0,P1),(T1,P1),(T2,P2)],
[(T1,P1),(T2,P2),(T3,P3)],
[(T2,P2),(T3,P3),(T4,P4)],
....

正如您所看到的,大多数消息将聚合到多个组。是否有人可以建议是否有一种方法可以做到这一点,即使用当前的聚合器而不编写聚合器。

spring集成聚合分组似乎也基于相关键,因此消息需要映射到一组相关键。然而,当前的api似乎只允许我们生成一个关联键,这意味着每个消息只能聚合到一个组。这方面有什么解决办法吗。

附笔。

在阅读了camel的源代码之后,camel似乎无法支持我们的需求。试试Spring的运气吧。交叉手指骆驼问题

共有1个答案

张逸清
2023-03-14

我们没有任何现成的东西,但我可以通过对SimpleMessageStore进行一个小小的修改来实现您的愿望。我已经在摘要中发布了完整的RollingMessageStore。

底线是修改removeGroup,只删除第一条消息,而不是整个组。此外,将completeGroup设置为no-op。

设置expreGroupOnComplection以强制聚合器“删除”组(通过调用修改后的RemoveGroup()方法。

这是SimpleMessageGroupRollingMessageGroup之间的区别...

182,184c190,194
< 
<               groupUpperBound.release(groupIdToMessageGroup.get(groupId).size());
<               groupIdToMessageGroup.remove(groupId);
---
>               Message<?> message = this.groupIdToMessageGroup.get(groupId).getOne();
>               if (message != null) {
>                   this.groupUpperBound.release(1);
>                   this.removeMessageFromGroup(groupId, message);
>               }

(加上删除完整组()中的所有代码。

还有一个测试用例。。。

@Test
public void testRolling() {
    AggregatingMessageHandler aggregator = new AggregatingMessageHandler(new MultiplyingProcessor(), new RollingMessageStore());
    aggregator.setExpireGroupsUponCompletion(true);
    aggregator.setReleaseStrategy(new ReleaseStrategy() {

        @Override
        public boolean canRelease(MessageGroup group) {
            return group.size() == 3;
        }
    });
    QueueChannel replyChannel = new QueueChannel();
    Message<?> message1 = createMessage(3, "ABC", 3, 1, replyChannel, null);
    Message<?> message2 = createMessage(5, "ABC", 3, 2, replyChannel, null);
    Message<?> message3 = createMessage(7, "ABC", 3, 3, replyChannel, null);
    Message<?> message4 = createMessage(9, "ABC", 3, 3, replyChannel, null);
    Message<?> message5 = createMessage(11, "ABC", 3, 3, replyChannel, null);

    aggregator.handleMessage(message1);
    aggregator.handleMessage(message2);
    aggregator.handleMessage(message3);
    aggregator.handleMessage(message4);
    aggregator.handleMessage(message5);

    Message<?> reply = replyChannel.receive(10000);
    assertNotNull(reply);
    assertEquals(reply.getPayload(), 105);
    reply = replyChannel.receive(10000);
    assertNotNull(reply);
    assertEquals(reply.getPayload(), 315);
    reply = replyChannel.receive(10000);
    assertNotNull(reply);
    assertEquals(reply.getPayload(), 693);
}

请继续并打开JIRA的新功能问题,我们将考虑将此(或更通用的解决方案)添加到即将发布的3.0版本中。

使用相关策略表达式=“foo”和

发布-策略-表达式=size()==3

 类似资料:
  • 问题内容: 我有一个 JFrame 。 我也有一个 Box 类,它扩展了 Component 。该box类具有一个 paint 方法,该方法可以创建一个填充的矩形。 当我将这些Box组件的多个添加到我的JFrame时,当我在JFrame上调用 重绘 时,仅显示最近添加的一个。 我看了一下布局管理器,但是我不确定那不是我想要的。我所希望的是能够在屏幕上的任何位置制作整个矩形的动画。 (我还尝试创建一

  • 问题内容: 我需要初始化一个常量HashMap,并希望在一行语句中完成它。避免这样的事情: 类似于目标C: 看了这么多,我还没有找到任何显示如何做到这一点的例子。 问题答案: 您可以使用 Double Brace初始化 ,如下所示: 作为警告,请参阅Java的效率效率“ Double BraceInitialization”,了解它可能带来的性能影响。

  • 问题内容: 我正在使用SQL LOADER在一个表中加载多个csv文件。我发现的过程非常简单,就像 但是我不想多次使用INFILE,因为如果我有1000个以上的文件,那么我必须在控制文件脚本中提及1000次INFILE。 所以我的问题是:是否有其他方法(如任何循环/任何* .csv)加载多个文件而不使用多个infile? 谢谢你 问题答案: 解决方案1:您可以将1000个文件串联到一个大文件中,然

  • 我想合并/添加一个新泽西项目B(已经运行良好)到一个新泽西项目a,这将充当一个过滤器/安全层。因此,作为一个基本步骤,我在项目a的构建路径上向项目B添加了依赖项,并在构建路径中向部署程序集添加了相同的依赖项。我从这篇文章中了解到,我可以通过将servlet放在同一个中,并使用以不同的方式映射它们来实现这一点。当我试图访问项目B的资源时,我没有任何运气。 因此,当我尝试访问时,它工作得很好。但是当我

  • 我想在github中添加一个颤动插件作为另一个颤动项目的依赖关系。 我遵守了守则https://flutter.dev/docs/development/packages-and-plugins/using-packages#dependencies-在未发布的包上添加了ref标记https://dart.dev/tools/pub/dependencies#git-包裹 我希望获取此分支(mas

  • 问题内容: 通过Rabbitmq中的示例,消费者可以一次从队列中获取所有消息。如何使用一条消息并退出? 问题答案: 您必须声明basicQos设置,才能一次从ACK到NACK状态获取一条消息,并禁用自动ACK以便显式给出确认。 希望能帮助到你!