当前位置: 首页 > 知识库问答 >
问题:

如何使用Flink对无序事件时间流进行排序

滕鸿畴
2023-03-14

这个问题涵盖了如何使用FlinkSQL对乱序流进行排序,但我更愿意使用DataStream API。一种解决方案是使用ProcessFunction来执行此操作,该ProcessFunction使用PriorityQueue来缓冲事件,直到水印指示它们不再乱序,但这在RocksDB状态后端中表现不佳(问题是每次访问PriorityQueue都需要整个PriorityQueue的ser/de)。无论使用哪个状态后端,我如何有效地做到这一点?

共有1个答案

景星光
2023-03-14

更好的方法(这或多或少是由Flink的SQL和CEP库内部完成的)是在MapState中缓冲无序流,如下所示:

如果要独立地对每个键进行排序,则首先为流设置键。否则,对于全局排序,请按常量对流进行键控,以便可以使用KeyedProcessFunction实现排序。

在该过程函数open方法中,实例化一个MapState对象,其中键是时间戳,值是具有相同时间戳的流元素列表。

在OneElement方法中:

  • 如果事件延迟,请将其丢弃或发送到侧面输出

调用onTimer时,该时间戳映射中的条目就可以作为已排序流的一部分发布了,因为当前水印现在指示应该已经处理了所有早期事件。在向下游发送事件后,不要忘记清除地图中的条目。

 类似资料:
  • 我正在学习Flink,我从使用DataStream的简单字数统计开始。为了增强处理能力,我过滤了输出,以仅显示找到3个或更多单词的结果。 我想创建一个WindowFunction,根据找到的单词值对输出进行排序。我试图实现的WindowFunction根本不编译。我正在努力定义WindowFunction接口的apply方法和参数。

  • 我正在使用java lambda对列表进行排序。 我怎样才能反向排序呢?

  • 问题内容: 我的清单包含大小等的集合。我尝试这样做,但似乎不起作用。 我想要的最终结果是。 我可以尝试添加在所有的元素和那种出来再做出新的的。但是,有某种班轮吗? 更新: 这可行,但是可以简化吗? 问题答案: @Eugene的回答很甜蜜,因为番石榴很甜。但是,如果您碰巧在类路径中没有番石榴,这是另一种方式: 首先,我将所有集合映射到一个流中,然后对所有元素进行排序,最后,将整个排序后的流收集到集合

  • 为了测试流处理和Flink,我给自己出了一个看似简单的问题。我的数据流由粒子的和坐标以及记录位置的时间组成。我的目标是用特定粒子的速度来注释这个数据。所以小溪看起来像这样。 现在无法保证事件会按顺序到达,即可能会在之前到达,即。 为了简单起见,可以假设任何迟来的数据将在早数据的内到达。 我承认,我是流处理和闪烁的新手,所以这可能是一个愚蠢的问题,提出一个明显的答案,但我目前被难倒了,如何去实现我的