我正在构建以下Kafka Streams拓扑(伪代码):
gK = builder.stream().gropuByKey();
g1 = gK.windowedBy(TimeWindows.of("PT1H")).reduce().mapValues().toStream().mapValues().selectKey();
g2 = gK.reduce().mapValues();
g1.leftJoin(g2).to();
如果您注意到了,这是一个菱形拓扑,从单个输入主题开始,到单个输出主题结束,消息流经两个并行流,最终在最后连接在一起。一个流适用(翻滚?)开窗,另一个没有。流的两个部分都在同一个键上工作(除了窗口化中间引入的WindowedKey之外)。
>
~40k条相同键的记录首先上传到输入主题中
正如预期的那样,输出主题中有~40K的更新
另有约40k条与步骤1)键相同但不同的记录被上传到输入主题中
我还注意到,如果将streamsconfig.topology_optimization
设置为streamsconfig.optimization
而不设置它,则调试器中至少第一次在前面的reduce()(或aggregate())处理程序之前调用mapValues()处理程序。我没想到。
不幸的是,尝试了join()和leftJoin(),结果相同。在调试器中,数据的第二部分根本不触发“左”流中的reduce()处理程序,而是触发“右”流中的reduce()处理程序。
根据我的配置,如果两个数据集中的记录数都是100条,那么问题就不会出现,我会得到200条输出消息。当我将每个数据集中的数字提高到200时,我得到的预期消息不到400条。因此,现在看起来像“旧”窗口这样的东西被删除了,这些旧窗口的新记录被流忽略了。可以设置窗口保留设置,但使用它的默认值时,我希望窗口保持它们的状态,并至少保持12个小时(这大大超过了我的单元测试运行时间)。
试图用以下窗口存储配置修改左侧精简程序:
Materialized.as(
Stores.inMemoryWindowStore(
"rollup-left-reduce",
Duration.ofDays(5 * 365),
Duration.ofHours(1), false)
)
结果仍然没有区别。
即使只有一个“left”流而没有“right”流并且没有join(),同样的问题仍然存在。问题似乎出在我的设置的窗口保留设置中。我的输入记录的时间戳(事件-时间)跨度为2年。第二个数据集再次从2年初开始。Kafka Streams中的这个位置确保忽略第二个数据集记录:
经过一段时间的调试,我找到了问题的原因。
我的输入数据集包含时间戳跨度为2年的记录。我正在加载第一个数据集,并将流的“观察”时间设置为来自输入数据集的最大时间戳。
第二个数据集的上载以时间戳比新的观察时间早2年的记录开始,会导致内部流删除消息。如果将Kafka日志记录设置为跟踪级别,可以看到这一点。
.windowedBy(TimeWindows.of(windowSize))
.windowedBy(TimeWindows.of(windowSize).grace(Duration.ofDays(5 * 365)))
Materialized.as(
Stores.inMemoryWindowStore(
"rollup-left-reduce",
Duration.ofDays(5 * 365),
windowSize, false)
)
就是这样,产量如预期。
我想使用早期触发逻辑进行窗口聚合(您可以认为聚合是由窗口关闭或特定事件触发的),我阅读了文档:https://ci . Apache . org/projects/flink/flink-docs-release-1.12/dev/stream/operators/windows . html # incremental-window-aggregate-with-aggregate functi
我有这个脚本: HTML中的用法: 由于某些原因,它的工作,如果页面重新加载一半,但不,它没有开火,在它工作之前,所以我不知道发生了什么。我在wordpress网站上使用这个。
我对Apache Storm的性能有一个问题,主要是从喷口出来的。 我有一个从kestrel队列发出项目的拓扑。我获取大约2000个项目,每次在喷注中调用时,我都会发出一个。 我正在使用1个spout任务和1个spout执行器运行。我已将设置为10。 为什么每次调用之间有这么大的时间间隔?outputCollector在发出一个新元组之前是否正在等待听到每个元组的反馈? 我正在运行Java8和st
我正在运行一个3节点的Storm集群。我们正在提交一个包含10个工作者的拓扑结构,以下是拓扑结构的详细信息 我们每天处理800万到1000万个数据。问题是topolgy只运行了2到3天,而我们在kafka spout中看到了一些失败的元组,没有处理任何消息。当提交新的topolgy时,它工作良好,但在2到3天后,我们又看到了同样的问题。有人能给我们一个解决方案吗。下面是我的storm配置
我们有一个不想连续运行storm拓扑的用例。相反,有一组输入(10K+)应该在指定的时间被处理,Spout连续发射这些输入,并得到拓扑中其余螺栓的处理。处理完所有输入后,在我的喷注中就没有任何东西可以从nextTuple发出。 此时,我们希望拓扑进入Hibernate状态,并在每天晚上12:00重新启动进程。 在storm配置中是否有任何属性可以设置为每天运行一次拓扑并在处理完成后Hibernat
我有flink stream,我在某个时间窗口上计算一些事情,比如说30秒。 这里发生的事情是,它给我的结果,我以前的窗口聚合以及。 假设前30秒我得到结果10。 接下来的三十秒我想要新的结果,而不是我得到最后一个窗口的结果新的等等。 因此,我的问题是如何为每个窗口获得新的结果。