当前位置: 首页 > 知识库问答 >
问题:

Flink窗口和后期事件

邓俊材
2023-03-14

我有一个用例,需要以不同的方式处理延迟事件和正常事件:如果事件在其窗口关闭后到达,则应将其发送到另一个路径。

我想是这样的。sideOutputLateData(..) 可以帮我解决这个问题。在正常情况下(即使用真实世界的数据)也是如此。但如果我想用伪造的数据来测试它,它就会停止工作。

我希望类似于:

val env = StreamExecutionEnvironment.createLocalEnvironment()
env.setParallelism(1)

val events: DataStream[(Int, Long)] = env.fromElements(
  (1, 1),
  (1, 15),
  (1, 25),
  (1, 8) //late Event
)
val lateEvents = OutputTag[(Int, Long)]("lateEvents")

val windowedSum = events
  .assignAscendingTimestamps(e => e._2)
  .windowAll(TumblingEventTimeWindows.of(time.Time.milliseconds(10)))
  .sideOutputLateData(lateEvents)
  .sum(position=0)

val lateEventsStream = windowedSum
  .getSideOutput(lateEvents)
  // Handle differently
  .map(e => (e._1 + 100, e._2))

windowedSum.print()
lateEventsStream.print()
// execute program
env.execute("Flink Scala watermarking test")

将导致:

[info] (1,1)
[info] (1,15)
[info] (1,25)
[info] (101, 8)

相反,我得到了:

[info] (2,1)
[info] (1,15)
[info] (1,25)

如果我使用socketTextStream作为具有相同数据的源,它将按预期工作。

这告诉我,在数据输入速度非常快的情况下,水印并没有像应该的那样前进。我尝试将setAutoWatermarkInterval调整为一个非常小的值,但没有成功。

我错过了什么吗?我如何测试我的工作?

共有2个答案

池照
2023-03-14

感谢@Dominik Wosinski为我指明了正确的方向。对于在Flink文档的复杂性中迷失的其他人,我将在这里发布我的解决方案

正如怀疑的那样,问题是快速输入数据没有推进水印。这是因为默认情况下,Flink将每200ms检查一次水印是否应该推进。您可以使用

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.getConfig.setAutoWatermarkInterval(10) // or even lower

显然,这对于作为4元素列表的快速输入是不够的。

解决方案是在每次事件时发出水印(请注意,在生产环境中不建议这样做)。

要实现这种解决方案,我们需要扩展WatermarkGenerator类:

class MyPunctuatedWatermarkAssigner extends WatermarkGenerator[(Int, Long)] {

  override def onEvent(
      event: (Int, Long),
      eventTimestamp: Long,
      output: WatermarkOutput
  ): Unit = {
    // emit at every event
    output.emitWatermark(new Watermark(event._2))
  }

  // do nothing at AutoWatermarkInterval
  override def onPeriodicEmit(output: WatermarkOutput): Unit = {}
}

要将此生成器分配给流,我们首先需要创建水印策略:

class MyStrategy extends WatermarkStrategy[(Int, Long)] {
  override def createWatermarkGenerator(
      context: WatermarkGeneratorSupplier.Context
  ): WatermarkGenerator[(Int, Long)] = new MyPunctuatedWatermarkAssigner
}

(此类还可以实现可选的CreateTimestAssigner方法)

然后我们可以在流中使用它:

eventsStream
   .assignTimestampsAndWatermarks(new MyStrategy())
贺高飞
2023-03-14

您可以尝试使用水印策略,该策略将为每个元素发出水印,而不是定期生成它们。实现水印生成器时,可以通过在元素内部发出水印来实现这一点,如下所述。这是用于测试的最佳和最可靠的方法。

 类似资料:
  • 我们计划将Apache Flink与一个巨大的IOT设置一起使用。客户将向我们发送某种结构化的传感器数据(如sensor_id、sensor_type、sensor_value、timestamp)。我们没有控制每个客户何时发送这些数据,最有可能是实时的,但我们没有保证。我们将所有事件存储在RabbitMQ/Kafka中。更新:我们可以假设每个传感器的事件是按顺序来的。 在开始实施可能的流式管道之

  • 我正在编写一个Flink应用程序,它使用kafka主题中的时间序列数据。时间序列数据包含度量名称、标记键值对、时间戳和值等组件。我已经创建了一个滚动窗口来根据度量键(度量名称、键值对和时间戳的组合)聚合数据。这里是主流看起来像 我还想检查是否有任何指标在上面的窗口外迟到。我想检查有多少指标延迟到达,并计算延迟指标与原始指标相比的百分比。我正在考虑使用flink的“允许延迟”功能将延迟指标发送到不同

  • 主要内容:1.窗口概述,2.窗口分类,3.细分,4.窗口Api,5.窗口分配器 Window Assigners,6.窗口函数,7.TopN 实例1.窗口概述 Flink 是一种流式计算引擎,主要是来处理无界数据流的,数据源源不断、无穷无尽。想要更加方便高效地处理无界流,一种方式就是将无限数据切割成有限的“数据块”进行处理,这就是所谓的“窗口”(Window)。 在 Flink 中, 窗口就是用来处理无界流的核心。我们很容易把窗口想象成一个固定位置的“框”,数据源源不断地流过来,到某个时间点窗口

  • 看看这篇关于水印的文章 在那篇文章的后面,它解释了当设置允许迟到时: Flink不会丢弃邮件,除非它超过了window\u end\u允许的延迟时间 由于设置了允许的延迟,是否实际导致了对窗口的延迟评估? 那么水印和允许迟到的用法到底有什么不同呢?什么时候使用哪个?

  • 一、窗口概念 在大多数场景下,我们需要统计的数据流都是无界的,因此我们无法等待整个数据流终止后才进行统计。通常情况下,我们只需要对某个时间范围或者数量范围内的数据进行统计分析:如每隔五分钟统计一次过去一小时内所有商品的点击量;或者每发生1000次点击后,都去统计一下每个商品点击率的占比。在 Flink 中,我们使用窗口 (Window) 来实现这类功能。按照统计维度的不同,Flink 中的窗口可以