嗨,我们正在尝试流式处理金融市场数据,通过利用apache camel或Spring集成来计算交易信号。我们的一个用例是根据价格时间戳将连续价格汇总在一起,如下所示:
输入消息以时间序列中的(时间戳、价格)对的形式出现。假设值为,每对(TX,PX)是一条消息,而T表示时间戳,P表示价格值
(T0,P1),(T1,P1),(T2,P2),(T3,P3),(T4,P4)...
假设我们需要将每3条连续消息聚合在一起进行进一步计算,给定需要生成以下组的输入消息,每个3对组都是聚合消息:
[(T0,P1),(T1,P1),(T2,P2)],
[(T1,P1),(T2,P2),(T3,P3)],
[(T2,P2),(T3,P3),(T4,P4)],
....
正如您所看到的,大多数消息将聚合到多个组。是否有人可以建议是否有一种方法可以做到这一点,即使用当前的聚合器而不编写聚合器。
spring集成聚合分组似乎也基于相关键,因此消息需要映射到一组相关键。然而,当前的api似乎只允许我们生成一个关联键,这意味着每个消息只能聚合到一个组。这方面有什么解决办法吗。
附笔。
在阅读了camel的源代码之后,camel似乎无法支持我们的需求。试试Spring的运气吧。交叉手指骆驼问题
我们没有任何现成的东西,但我可以通过对SimpleMessageStore进行一个小小的修改来实现您的愿望。我已经在摘要中发布了完整的RollingMessageStore。
底线是修改removeGroup,只删除第一条消息,而不是整个组。此外,将completeGroup设置为no-op。
设置expreGroupOnComplection
以强制聚合器“删除”组(通过调用修改后的RemoveGroup()
方法。
这是SimpleMessageGroup
和RollingMessageGroup
之间的区别...
182,184c190,194
<
< groupUpperBound.release(groupIdToMessageGroup.get(groupId).size());
< groupIdToMessageGroup.remove(groupId);
---
> Message<?> message = this.groupIdToMessageGroup.get(groupId).getOne();
> if (message != null) {
> this.groupUpperBound.release(1);
> this.removeMessageFromGroup(groupId, message);
> }
(加上删除完整组()
中的所有代码。
还有一个测试用例。。。
@Test
public void testRolling() {
AggregatingMessageHandler aggregator = new AggregatingMessageHandler(new MultiplyingProcessor(), new RollingMessageStore());
aggregator.setExpireGroupsUponCompletion(true);
aggregator.setReleaseStrategy(new ReleaseStrategy() {
@Override
public boolean canRelease(MessageGroup group) {
return group.size() == 3;
}
});
QueueChannel replyChannel = new QueueChannel();
Message<?> message1 = createMessage(3, "ABC", 3, 1, replyChannel, null);
Message<?> message2 = createMessage(5, "ABC", 3, 2, replyChannel, null);
Message<?> message3 = createMessage(7, "ABC", 3, 3, replyChannel, null);
Message<?> message4 = createMessage(9, "ABC", 3, 3, replyChannel, null);
Message<?> message5 = createMessage(11, "ABC", 3, 3, replyChannel, null);
aggregator.handleMessage(message1);
aggregator.handleMessage(message2);
aggregator.handleMessage(message3);
aggregator.handleMessage(message4);
aggregator.handleMessage(message5);
Message<?> reply = replyChannel.receive(10000);
assertNotNull(reply);
assertEquals(reply.getPayload(), 105);
reply = replyChannel.receive(10000);
assertNotNull(reply);
assertEquals(reply.getPayload(), 315);
reply = replyChannel.receive(10000);
assertNotNull(reply);
assertEquals(reply.getPayload(), 693);
}
请继续并打开JIRA的新功能问题,我们将考虑将此(或更通用的解决方案)添加到即将发布的3.0版本中。
使用相关策略表达式=“foo”和
发布-策略-表达式=size()==3
。
问题内容: 我有一个 JFrame 。 我也有一个 Box 类,它扩展了 Component 。该box类具有一个 paint 方法,该方法可以创建一个填充的矩形。 当我将这些Box组件的多个添加到我的JFrame时,当我在JFrame上调用 重绘 时,仅显示最近添加的一个。 我看了一下布局管理器,但是我不确定那不是我想要的。我所希望的是能够在屏幕上的任何位置制作整个矩形的动画。 (我还尝试创建一
问题内容: 我需要初始化一个常量HashMap,并希望在一行语句中完成它。避免这样的事情: 类似于目标C: 看了这么多,我还没有找到任何显示如何做到这一点的例子。 问题答案: 您可以使用 Double Brace初始化 ,如下所示: 作为警告,请参阅Java的效率效率“ Double BraceInitialization”,了解它可能带来的性能影响。
问题内容: 我正在使用SQL LOADER在一个表中加载多个csv文件。我发现的过程非常简单,就像 但是我不想多次使用INFILE,因为如果我有1000个以上的文件,那么我必须在控制文件脚本中提及1000次INFILE。 所以我的问题是:是否有其他方法(如任何循环/任何* .csv)加载多个文件而不使用多个infile? 谢谢你 问题答案: 解决方案1:您可以将1000个文件串联到一个大文件中,然
我想合并/添加一个新泽西项目B(已经运行良好)到一个新泽西项目a,这将充当一个过滤器/安全层。因此,作为一个基本步骤,我在项目a的构建路径上向项目B添加了依赖项,并在构建路径中向部署程序集添加了相同的依赖项。我从这篇文章中了解到,我可以通过将servlet放在同一个中,并使用以不同的方式映射它们来实现这一点。当我试图访问项目B的资源时,我没有任何运气。 因此,当我尝试访问时,它工作得很好。但是当我
我想在github中添加一个颤动插件作为另一个颤动项目的依赖关系。 我遵守了守则https://flutter.dev/docs/development/packages-and-plugins/using-packages#dependencies-在未发布的包上添加了ref标记https://dart.dev/tools/pub/dependencies#git-包裹 我希望获取此分支(mas
我有一个程序,它创建一个套接字(服务器和客户端程序),并使用该套接字通过TCP端口发送消息。我的问题是,我如何交换多条消息?每次我发送消息时,端口都会关闭,我需要使用另一个端口发送另一条消息。 例如,我必须从客户端向服务器发送2个数字,服务器需要回复我发送的数字的总和。如何实现在同一端口上发送未定义的数字甚至2个数字? 以下是代码(相当标准的东西): 服务器: 客户: 例如,我运行代码并为服务器端