当前位置: 首页 > 知识库问答 >
问题:

闪烁水印和触发器-迟来的元素没有在事件时间丢弃?

端木令雪
2023-03-14

我对Flink在事件时间上加水印时如何处理后期元素有些困惑。

import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment}
import org.apache.flink.streaming.api.windowing.assigners.{TumblingEventTimeWindows}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector
import org.apache.log4j.{Level, Logger}

object EventTimeExample {

  Logger.getLogger("org").setLevel(Level.OFF)
  Logger.getLogger("akka").setLevel(Level.OFF)

  case class ExampleType(time: Long, value: Long)

  def main(args: Array[String]) {

    // Set up environment
    val env = StreamExecutionEnvironment.createLocalEnvironment(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    // Example S3 path
    val simple = env.fromCollection(Seq(
      ExampleType(1525132800000L, 1),
      ExampleType(1525132800000L, 2) ,
      ExampleType(1525132920000L, 3),
      ExampleType(1525132800000L, 4)
    ))
      .assignAscendingTimestamps(_.time)

    val windows = simple
      .windowAll(TumblingEventTimeWindows.of(Time.seconds(60)))
      .apply{
       (window, iter, collector: Collector[(Long, Long, String)]) => {
        collector.collect(window.getStart, window.getEnd, iter.map(_.value).toString())
      }
    }

    windows.print
    env.execute("TimeStampExample")
  }
}
(1525132800000,1525132860000,List(1, 2, 4))
(1525132920000,1525132980000,List(3))

现在我认识到这是一个微不足道的例子,但不理解这会使理解更复杂的流变得困难。

共有1个答案

万俟炯
2023-03-14

你的理解基本上是正确的,但是这里还发生了几件事情需要考虑进去。

首先,您使用了assignAscendingTimestamps(),它只能在事件流(按时间戳)完全按顺序排列的情况下使用,而这里的情况并非如此。运行此应用程序时应看到此警告:

WARN  org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor  - Timestamp monotony violated: 1525132800000 < 1525132920000

这里起作用的另一个因素是AscendingTimeStampExtractor不会更新每个经过的流元素的当前水印。这是一个周期性水印生成器的示例,它将每隔n毫秒将水印注入流中,其中n由ExecutionConfig.SetAutoWaterMarkInterval(...)定义,默认为200毫秒。这就是事件#4潜入第一个窗口的方式。

要获得您期望的结果,您可以实现一个标点符号水印生成器,该生成器配置为为每个事件生成水印:

class PunctuatedAssigner extends AssignerWithPunctuatedWatermarks[ExampleType] {
  override def extractTimestamp(element: ExampleType, previousElementTimestamp: Long): Long = {
    element.time
  }

  override def checkAndGetNextWatermark(lastElement: ExampleType, extractedTimestamp: Long): Watermark = {
    new Watermark(extractedTimestamp)
  }
}

然后您可以这样使用它:

val simple = env.fromCollection(Seq(
  ExampleType(1525132800000L, 1),
  ExampleType(1525132800000L, 2) ,
  ExampleType(1525132920000L, 3),
  ExampleType(1525132800000L, 4)
))
  .assignTimestampsAndWatermarks(new PunctuatedAssigner)

现在您的示例产生了以下结果:

(1525132800000,1525132860000,List(1, 2))
(1525132920000,1525132980000,List(3))
override def checkAndGetNextWatermark(lastElement: ExampleType, extractedTimestamp: Long): Watermark = {
  new Watermark(extractedTimestamp - 200000)
}
(1525132800000,1525132860000,List(1, 2, 4))
(1525132920000,1525132980000,List(3))
val windows = simple
  .windowAll(TumblingEventTimeWindows.of(Time.seconds(60)))
  .allowedLateness(Time.seconds(200))
  ...

这将导致第一个窗口触发两次:

(1525132800000,1525132860000,List(1, 2))
(1525132800000,1525132860000,List(1, 2, 4))
(1525132920000,1525132980000,List(3))

请注意,由于处理水印会带来一些开销,因此通常不希望以这种方式使用标点符号水印(每个事件都有一个水印)。对于大多数应用程序,基于boundedoutofordernestimestampextractor的周期性水印是更好的选择。

 类似资料:
  • 我正在阅读《Stream Processing with Apache Flink》一书,书中说:“从版本0.10.0开始,Kafka支持消息时间戳。当从Kafka版本0.10或更高版本读取时,如果应用程序以事件时间模式运行,使用者将自动提取消息时间戳作为事件时间戳*“因此在函数中,调用将默认返回Kafka消息时间戳?请提供一个简单的示例,说明如何实现AssignerWithPeriodicalW

  • 当特定时间过去后,如何在Flink中设置触发器以执行某些操作?每天下午1点的流量总和

  • 我正在处理来自物联网设备的事件流。 这些事件具有由网络设置的第一级时间戳。他们还将在不同时间点采取的多项措施组合在一起。例如: 网络时间9:08 度量值将按小时汇总,在这种情况下,M1应在8:00-9:00窗口中,M2应在9:00-10:00窗口中。 我想知道设计我的flink应用程序、管理这些时间戳和相关水印的正确方法是什么。根据我目前的理解: 我可能应该将所有与网络时间(9:08)相关的处理放

  • 我有一个flink任务,它使用带事件时间和水印的键控翻滚窗口来聚合数据。 我的问题是,flink是否保持着他已经关闭的窗口的状态?否则,我没有其他解释为什么属于以前从未打开过的窗口的事件会打开一个窗口而不会立即删除它。 假设我们的窗口是1小时,禁止自动关闭是10分钟 让我们举个例子: event1=("2022-01-01T08:25:00Z")= event2=("2022-01-01T09:2

  • 我正在运行一个简单的示例来测试基于EventTime的Windows。我能够生成带有处理时间的输出,但当我使用EventTime时,没有输出。请帮助我明白我做错了什么。

  • 我在Lollipop上的共享元素转换中看到了奇怪的事情。共享元素在开始动画之前闪烁(请看视频https://www.youtube.com/watch?v=DCoyyC_S-9A) 我不知道为什么会这样。但是,当我添加