当前位置: 首页 > 知识库问答 >
问题:

FlinkKafkaConsumer中事件时间顺序的保证

尉迟京
2023-03-14

TL;DR:目前保证Flink中事件时间顺序的最佳解决方案是什么?

我使用Flink 1.8.0和Kafka 2.2.1。我需要通过事件时间戳保证事件的正确顺序。我每隔1秒生成周期性水印。我使用Flink Kafka消费者与AscendingTimestampExtractor:

java prettyprint-override">val rawConsumer = new FlinkKafkaConsumer[T](topicName, deserializationSchema, kafkaConsumerConfig)
  .assignTimestampsAndWatermarks(new AscendingTimestampExtractor[T] {
      override def extractAscendingTimestamp(element: T): Long =
        timestampExtractor(element)
      })
 .addSource(consumer)(deserializationSchema.getProducedType).uid(sourceId).name(sourceId)

然后处理:

myStream
   .keyBy(ev => (ev.name, ev.group))
   .mapWithState[ResultEvent, ResultEvent](DefaultCalculator.calculateResultEventState)

我意识到,对于在同一毫秒或几毫秒之后发生的无序事件,Flink不会纠正顺序。我在文档中发现:

水印触发所有窗口的计算,其中最大时间戳(结束时间戳-1)小于新水印

https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#interaction-水印和窗口的应用

所以我准备了额外的处理步骤来保证事件时间顺序:

myStream
      .timeWindowAll(Time.milliseconds(100))
      .apply((window, input, out: Collector[MyEvent]) => input
        .toList.sortBy(_.getTimestamp)
        .foreach(out.collect) // this windowing guarantee correct order by event time
      )(TypeInformation.of(classOf[MyEvent]))
      .keyBy(ev => (ev.name, ev.group))
      .mapWithState[ResultEvent, ResultEvent](DefaultScoring.calculateResultEventState)

然而,我发现这个解决方案很难看,它看起来像一个变通方法。我还担心KafkaSource的每分区水印

理想情况下,我希望将顺序保证放在KafkaSource中,并为每个kafka分区保留它,就像每个分区的水印一样。有可能吗?目前保证Flink事件时间顺序的最佳解决方案是什么?

共有2个答案

苏鸿才
2023-03-14

这是一个伟大的观点。在KafkaSource中保证秩序实际上包括两个部分。

  1. 保证同一子任务中分区之间的顺序。
  2. 保证子任务之间的顺序。

第一部分已经在进行中https://issues.apache.org/jira/browse/FLINK-12675.第二部分需要子任务之间共享状态的支持,这可能需要社区中更多的讨论和详细的计划。

回到你的问题,我认为通过设置缓冲数据的窗口来保持事件的顺序是目前最好的解决方案。

轩辕华辉
2023-03-14

Flink不保证按事件时间顺序处理记录。分区内的记录将按其原始顺序进行处理,但当两个或多个分区合并到一个新分区时(由于流的重新分区或合并),Flink会将这些分区的记录随机合并到新分区中。其他一切都将是低效的,并导致更高的延迟。

例如,如果作业有一个从两个Kafka分区读取的源任务,两个分区的记录将以某种随机的锯齿形模式合并。

但是,Flink保证针对生成的水印正确处理所有事件。这意味着,水印永远不会超过记录。例如,如果Kafka源生成每个分区的水印,即使合并了多个分区的记录,水印仍然有效。水印用于收集和处理时间戳小于水印的所有记录。因此,它确保输入数据的完整性。

这是按时间戳对记录进行排序的先决条件。你可以用一个翻滚的窗户来做这一切。但是,您应该知道

  1. 所有窗口都将在单个任务中执行(即,它不是并行的)。如果每个键的顺序足够,您应该使用一个常规的翻滚窗口,或者更好地实现一个更高效的KeyedProcessFunction
 类似资料:
  • 问题1:表中中的行的事件总是具有相同的键,对吗? 问题2:由于kafka会将具有相同密钥的数据发送到相同分区,所以我可以说的事件可以有序地使用,对吗? 问题3:如果我将主键更改为varchar,那么该键将发生变化,因此分区号可能会发生变化,在这种情况下,我如何保证事件总是有序地消耗?

  • 总之,我希望flatMap1()和flatMap2()按照我在事件中设置的时间戳的顺序被调用。但那不是真的。

  • 阅读akka-stream的留档,我不太清楚消息的顺序以及是否可以强制执行。让我用我为聊天服务器编写的一小段代码来设置我的问题的上下文。 为了让事情变得简单,我使用了这个流的形状和一个非常简单的源和汇。像这样的-- 现在,我的担忧来了。终端中打印的事件顺序根本不正常。我不知道该怎么解决。这是我得到的结果-- 输出中缺少第一条消息。消息似乎是在打印之前发送的。 我尝试通过使用(我在上面的代码中对此进

  • 我想通过moding user_id来创建用户事件的N个分区N以便用户可以按照发送事件的顺序处理事件。 如果我曾经决定N不足以处理负载,并且希望分别增加分区和使用者的数量,那么在使用用户事件时,我必须做什么来保留事件顺序呢?

  • 问题内容: 我有以下代码: 但是,当我执行此代码时,在图形 之前 会收到响应“ hello” 。为什么会这样呢?我将如何改变它以便我首先得到图形? 问题答案: 异步,您永远不知道哪个函数先运行\先完成… 想想异步操作,例如告诉一群人跑1英里,您知道谁会先完成吗?(是的,乔恩·斯基特,然后是查克·诺里斯…) 您可以使用Callack来运行第二个ajax:

  • 我使用Flink数据流API中的< code > keyedcorprocessfunction 类来实现一个超时用例。场景如下:我有一个输入kafka主题和一个输出kafka主题,一个服务从输入主题中读取并处理它(持续可变的时间),然后在输出Kafka主题中发布响应。 现在要实现超时(必须使用Flink datastream API),我有一个从kafka输入主题读取的和另一个从kafka输出主