当前位置: 首页 > 知识库问答 >
问题:

Flink流式事件时间窗口排序

萧飞
2023-03-14
import java.util.concurrent.TimeUnit
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.scala._

object WindowExample extends App {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.getConfig.enableTimestamps()
    env.setParallelism(1)

    val start = 1449597577379L
    val tuples = (1 to 10).map(t => (start + t * 1000, t))

    env.fromCollection(tuples)
      .assignAscendingTimestamps(_._1)
      .timeWindowAll(Time.of(1, TimeUnit.SECONDS))
      .sum(1)
      .print()

    env.execute()
}

输入:

 (1449597578379,1)
 (1449597579379,2)
 (1449597580379,3)
 (1449597581379,4)
 (1449597582379,5)
 (1449597583379,6)
 (1449597584379,7)
 (1449597585379,8)
 (1449597586379,9)
 (1449597587379,10)

结果:

[info] (1449597579379,2)
[info] (1449597581379,4)
[info] (1449597583379,6)
[info] (1449597585379,8)
[info] (1449597587379,10)
[info] (1449597578379,1)
[info] (1449597580379,3)
[info] (1449597582379,5)
[info] (1449597584379,7)
[info] (1449597586379,9)

共有1个答案

程博学
2023-03-14

这种行为的原因是在Flink中没有考虑元素的顺序(相对于时间戳)。只有水印的正确性及其与元素时间戳的关系对于考虑时间的操作才是重要的,因为水印通常在基于时间的操作中触发计算。

在您的示例中,window运算符将源中的所有元素存储在内部窗口缓冲区中。然后,源发出一个水印,表示未来不会有时间戳较小的元素到达。这又告诉window操作符处理结束时间戳低于水印的所有窗口(这对所有窗口都是真的)。因此,它发出所有的窗口(任意排序),然后它发出一个水印本身。下游的操作将自己接收这些元素,并且一旦接收到水印就可以进行处理。

默认情况下,从源发出水印的间隔为200 ms。由于源发出的元素数量很少,所有元素都会在第一个水印发出之前发出。在一个实际的用例中,水印发射间隔比窗口大小小得多,您将得到窗口按照时间戳的顺序发射的预期行为。例如,如果每500毫秒有1小时窗口和水印。

 类似资料:
  • 我正在运行一个简单的示例来测试基于EventTime的Windows。我能够生成带有处理时间的输出,但当我使用EventTime时,没有输出。请帮助我明白我做错了什么。

  • 我想知道是否可以创建类似于以下内容的WindowAssigner: 但我不希望窗口在每个元素的事件时间中保持增长。我希望在接收到的第一个元素(对于该键)处定义窗口的开头,并在1秒后精确结束,无论有多少元素到达该秒。 所以它可能看起来像这样的假设: 谢谢

  • 为什么会这样?如果我在“assigntimestamps(timestampExtractor)”之前添加“keyby(keySelector)”,那么程序可以工作。有人能解释一下原因吗?

  • 假设我有一个股票市场交易事件流,如下所示: 使得technicalN(其中N是一些数字)代表给定公司的日终股票市场交易数据的第N个技术交易条目[开盘(浮动)、高位(浮动)、低位(浮动)、收盘(浮动)、成交量(int)]。(即ticker GOOG的技术1不同于ticker MSFT的技术1。)如: (请注意,这些交易价格/交易量完全是虚构的。 假设我想创建一个大小为2、时间间隔为1天的窗口,这样我

  • 我的Flink工作必须在每次工作轮班后计算某个集合。换挡是可配置的,看起来类似于: 出于操作目的,每天的班次都是一样的,一周/一年中的几天之间没有区别。轮班配置可以随时间变化,并且可以不单调,因此表中留下了一个简单的EventTime窗口,如:,因为一些轮班可能会缩小或超时,或者在中间插入几个小时... 我想出了一些基于GlobalWindow和自定义触发器的东西: 在我的自定义触发器中,我尝试识

  • 当我使用 flink 事件时间窗口时,窗口只是不触发。如何解决问题,有没有办法调试?