当前位置: 首页 > 知识库问答 >
问题:

闪烁是否保存带有水印的关闭事件时间窗口的历史?

鞠嘉誉
2023-03-14

我有一个flink任务,它使用带事件时间和水印的键控翻滚窗口来聚合数据。

我的问题是,flink是否保持着他已经关闭的窗口的状态?否则,我没有其他解释为什么属于以前从未打开过的窗口的事件会打开一个窗口而不会立即删除它。

假设我们的窗口是1小时,禁止自动关闭是10分钟

让我们举个例子:

event1=("2022-01-01T08:25:00Z")=

event2=("2022-01-01T09:25:00Z")=

event3=("2022-01-01T05:25:00Z")=

event 4 =(" 2022-01-01t 05:40:00Z ")=

    val stream = env
      .fromSource(
        kafkaSource,
        WatermarkStrategy
          .forBoundedOutOfOrderness[(String, EnrichedProcess, KafkaHeaders)](Duration.ofMinutes(outOfOrdernessMinutes)) //Watermark max time for late events
          .withIdleness(Duration.ofSeconds(idleness))
          .withTimestampAssigner(new SerializableTimestampAssigner[(String, EnrichedProcess, KafkaHeaders)] {
            override def extractTimestamp(element: (String, EnrichedProcess, KafkaHeaders), recordTimestamp: Long)
                : Long = {
              logger.info(
                LogMessage(
                  element._3.orgId,
                  s"Received incoming EnrichedProcess update_time: ${element._3.updateTime}, process time: ${recordTimestamp.asDate}",
                  element._3.flowId
                )
              )
              element._3.updateTime.asEpoch
            }
          }),
        s"Source - $kConsumeTopic"
      )

    stream
      .keyBy(element => (element._2.orgId -> element._2.procUid))                                                                     
      .window(TumblingEventTimeWindows.of(Time.hours(tumblingWindowHours), Time.minutes(windowStartingOffsetMinutes)))
      .reduce(new ReduceFunc)                                                                                         
      .name("Aggregated EnrichedProcess")
      .sinkTo(kafkaConnector.createKafkaSink(kProducerServers, kProduceTopic))
      .name(s"Sink -> $kProduceTopic")

编辑:我测试的方式是使用docker compose进行集成测试。我正在为Kafka制作一个事件=

当我在发送事件3和事件4之间放置30秒的睡眠时,事件4被丢弃了。这就是我所期望的行为。

    val producer = new Producer(producerTopic)

    val consumer = new Consumer(consumerTopic, groupId)
    producer.send(event1)
    producer.send(event2)
    Thread.sleep(30000)
    producer.send(event3)
    Thread.sleep(30000)
    producer.send(event4)

    val received: Iterable[(String, EnrichedProcess)] = consumer.receive[EnrichedProcess]()

但是现在更奇怪的是,为什么当我把睡眠时间设置为10秒而不是30秒时,我只收到第一种情况(水印应该已经更新了(定期水印生成器的默认值是200毫秒)

共有1个答案

拓拔俊德
2023-03-14

执行摘要:

使用Flink的基于事件时间的逻辑中的不确定性来自于处理时间与事件时间的混合——就像周期性水印生成器和空闲检测一样。只有在没有延迟事件或空闲源的情况下,才能确保确定性结果。

更多详情:

虽然你会期待

event3 = ("2022-01-01T05:25:00Z")

要迟到,只有当足够大的水印设法先到达时,它才会真正迟到。使用forBoundedOutOfOrness策略不能保证这一点——这是一个周期性的水印生成器,每200毫秒产生一次水印。因此,可能是基于event2的时间戳的水印在event3和event4之间产生。

这是一种可能的解释。根据具体情况,可能还有其他情况。例如,随着所有这些Hibernate状态的进行,水印生成器的一个并行度实例至少空闲了一分钟,这可能涉及产生正在观察到的行为(取决于空闲值等)。

更多背景:

其中平行度为

具有多个输入信道(例如键控窗口)的操作者将通过将(来自所有非空闲信道的)输入水印中的最小值作为他们自己的水印来组合这些水印。

回答原问题:

Flink是否保留已经关闭的窗口的状态?不会。一旦允许的延迟(如果有)过期,事件时间窗口的状态将被清除。

 类似资料:
  • 我正在运行一个简单的示例来测试基于EventTime的Windows。我能够生成带有处理时间的输出,但当我使用EventTime时,没有输出。请帮助我明白我做错了什么。

  • 我正在阅读《Stream Processing with Apache Flink》一书,书中说:“从版本0.10.0开始,Kafka支持消息时间戳。当从Kafka版本0.10或更高版本读取时,如果应用程序以事件时间模式运行,使用者将自动提取消息时间戳作为事件时间戳*“因此在函数中,调用将默认返回Kafka消息时间戳?请提供一个简单的示例,说明如何实现AssignerWithPeriodicalW

  • 我是Flink的新手,需要方法的帮助。我有时间颗粒度为5分钟的事件流。我想通过调用rest API来获取事件的元数据,其中包含过去1小时数据点的历史事件,即过去12点(5分钟时间颗粒度)。 e、 g事件的时间戳为10:00、10:05、10:10、10:15等,因此如果我想获取时间戳为11:00的事件元数据,我将调用send发送所有时间戳为10:00、10:05、10:10、10:15的事件。。1

  • 我需要根据一个键连接两个事件源。事件之间的间隔最长可达1年(即具有id1的event1可能在今天到达,而来自第二个事件源的具有id1的相应event2可能在一年后到达)。假设我只想输出连接的事件输出。 我正在探索在RocksDB后端使用Flink的选项(我遇到了表API,它们似乎适合我的用例)。我找不到做这种长窗口连接的引用体系结构。我希望系统一天能处理大约2亿个事件。 关于处理这种长窗口连接的任

  • 我正在处理来自物联网设备的事件流。 这些事件具有由网络设置的第一级时间戳。他们还将在不同时间点采取的多项措施组合在一起。例如: 网络时间9:08 度量值将按小时汇总,在这种情况下,M1应在8:00-9:00窗口中,M2应在9:00-10:00窗口中。 我想知道设计我的flink应用程序、管理这些时间戳和相关水印的正确方法是什么。根据我目前的理解: 我可能应该将所有与网络时间(9:08)相关的处理放

  • 我有一个流系统,在这里我可以获得点击流数据。 数据格式: 我怎样才能做到这一点呢?基本上,我必须维护窗口中所有事件的状态,然后,一旦我获得事件,我必须从该状态获取价格。我并不要求任何工作解决方案,只是要求如何维护窗口中所有事件的状态。我也有一些自定义的Reduce操作。 在:我将2个事件数据加入到列表中。