当前位置: 首页 > 知识库问答 >
问题:

Kafka流:处理来自不同分区的消息时的事件-时间偏斜

柳杰
2023-03-14

让我们考虑一个主题,该主题具有多个分区,并且消息以事件-时间顺序编写,没有任何特定的分区方案。Kafka Streams应用程序对这些消息进行一些转换,然后按某个键分组,然后按给定宽限期的事件时间窗口聚合消息。

每个任务可以以不同的速度处理传入的消息(例如,因为运行在具有不同性能特征的服务器上)。这意味着在groupBy shuffle之后,当内部主题的同一分区中的消息源自不同任务时,事件时排序将不会在它们之间保留。过一段时间,事件时间偏差可能会变得大于宽限期,这将导致丢弃源自滞后任务的消息。

增加宽限期似乎不是一个有效的选项,因为它会延迟最终聚合结果的发布。Apache Flink通过在分区合并上发出最低的水印来处理这一问题。

它应该是一个真正的关注,特别是在处理大量历史数据的时候,还是我错过了什么?Kafka流提供了一个解决方案来处理这种情况吗?

更新我的问题不是关于KStream-KStream联接,而是关于在流洗牌之前的单个KStream事件时聚合。

请考虑以下代码段:

stream
  .mapValues(...)
  .groupBy(...)
  .windowedBy(TimeWindows.of(Duration.ofSeconds(60)).grace(Duration.ofSeconds(10)))
  .aggregate(...)

我假设mapValues()操作对于某些任务来说可能很慢,不管是什么原因,因为任务处理消息的速度不同。当aggregate()运算符发生混洗时,任务0可能处理了时间T之前的消息,而任务1仍然处于T-skew,但来自两个任务的消息最终交织在内部主题的单个分区中(对应于分组键)。

我关心的是,当偏斜足够大时(在我的示例中超过10秒),来自滞后任务1的消息将被丢弃。

共有1个答案

淳于涛
2023-03-14

基本上,任务/处理器维护一个流时间,它被定义为任何已经轮询的记录的最高时间戳。然后,在Kafka流中,该流时间被用于不同的目的(例如:标点器、窗口聚合等)。

[窗口聚合]

正如您提到的,stream-time用于确定是否应该接受记录,即record_accepted=end_window_time(当前记录)+grace_period>observed stream_time

正如您所描述的,如果多个任务并行运行以基于分组键来洗牌消息,并且某些任务比其他任务慢(或者某些分区脱机),这将创建无序的消息。不幸的是,恐怕唯一的处理方法是增加grace_perio

这实际上是可用性和一致性之间永恒的权衡。

[KafkaStream和KafkaStream/Ktable连接的行为

提取的记录将每个分区缓冲到由任务管理的内部队列中。因此,每个队列包含等待处理的单个分区的所有记录。

然后,从队列中逐个轮询记录,并由拓扑实例处理。但是,这是来自具有最低时间戳的非空队列的记录,该记录是从轮询的返回的。

此外,如果队列为空,则任务可能在一段时间内变得空闲,从而不再从队列轮询更多记录。您可以实际配置任务保持空闲的最长时间,可以使用流配置来定义:max.Task.idle.ms

这种机制允许同步共本地化分区。Bu,默认max.task.idle.ms设置为0。这意味着任务永远不会等待来自分区的更多数据,这可能会导致记录被过滤,因为流时间可能会增加得更快。

 类似资料:
  • 我有一个Kafka主题,目前有3个分区。我希望我的消费者从同一个分区读取,但每条消息都应该以循环方式发送给不同的消费者。有可能实现吗?

  • 我有4个分区和4个消费者(例如A、B、C、D)。如何使用使用者组配置哪个使用者将从哪个分区读取数据。我用的是Kafka的春靴。

  • 寻找设计我的Kafka消费者的最佳方法。基本上,我想看看什么是避免数据丢失的最佳方法,以防在处理消息期间出现任何异常/错误。 我的用例如下。 a)我使用SERVICE来处理消息的原因是 - 将来我计划编写一个ERROR处理器应用程序,该应用程序将在一天结束时运行,它将尝试再次处理失败的消息(不是所有消息,而是由于任何依赖项(如父级缺失)而失败的消息)。 b)我想确保没有消息丢失,所以我会将消息保存

  • 如果每个Kafka消息属于一个特定的会话,如何管理会话关联,以便同一个Spark执行器看到链接到一个会话的所有消息? 如何确保属于会话的消息被Spark executor按照在Kafka中报告的顺序处理?我们能以某种方式实现这一点而不对线程计数施加限制并导致处理开销(如按消息时间戳排序)吗? 何时检查会话状态?在执行器节点崩溃的情况下,如何从最后一个检查点恢复状态?在驱动程序节点崩溃的情况下,如何

  • 我需要打印/记录/存储处理消息的kafka分区和偏移量。我如何才能做到这一点?我使用StreamBridge从制作人那里发送消息,还使用功能性spring kafka streams方法

  • 我创建了一个带有10个分区的Kafka主题,并尝试通过单个Kafka消费者来消费消息。但是,kafka consumer并不是从所有分区读取消息。更具体地说,它只使用来自5个特定分区的消息。示例:使用者仅使用来自[0,1,2,3,4]的消息。在重新启动之后,如果它开始使用来自[5,6,7,8,9]的消息,那么它将只使用来自这些分区的消息。下面是kafka-consumer-offset-check