当前位置: 首页 > 知识库问答 >
问题:

Kafka流:在窗口化时重新处理旧数据

邹举
2023-03-14

具有Kafka Streams应用,其通过例如1天的流连接来执行开窗(使用原始事件时间,而不是挂钟时间)。

如果启动此拓扑,并从头开始重新处理数据(如在 lambda 样式的体系结构中),此窗口是否会将旧数据保留在那里?da 例如:如果今天是2022-01-09,而我收到来自2021-03-01的数据,那么这个旧数据会进入表格,还是会从一开始就被拒绝?

在这种情况下,可以采取什么策略来重新处理这些数据?

使用Kafka Streams 2.5.0进行更新

共有1个答案

国俊艾
2023-03-14

更新了OP Kafka Streams 2.5版的答案:

在使用事件时间时,只要没有事件包含闹钟时间,Kafka Streams的行为将独立于闹钟时间。您不应该将WallclockTimestampExtractor配置为时间戳提取程序。

Kafka Streams会将输入主题分区分配给流任务,这将一次消耗一个事件的分区。在任何给定的主题上,至多一个分区将被分配给一个流任务。为每个流任务分别执行时间窗聚合。Kafka Streams为每个聚合使用一个名为“observedStreamTime”的内部时间戳来跟踪迄今为止看到的最大时间戳。将传入记录的时间戳与观察到的流时间进行比较。如果它们早于配置的时间窗口存储的保留宽限期,它们将被丢弃。否则,它们将根据配置进行聚合。该实现可以在https://github . com/Apache/Kafka/blob/d5b 53 ad 132d 1c 1 bfcd 563 ce 5015884 b 6 da 831777/streams/src/main/Java/org/Apache/Kafka/streams/kstream/internals/kstreamwindowaggregate . Java # L108-L175中找到

如果重置Kafka Streams应用程序,此处理将始终产生相同的结果。它独立于处理的执行时间。如果删除事件,则更改相应的指标。

当使用多个主题时,这种方法有一个警告。observedStreamTime将反映流任务读取的所有分区的最高时间戳。如果您有两个主题(可能是因为您想加入它们),其中一个包含的数据比另一个要年轻得多(可能是由于后者没有收到新数据),则观察到的StreamTime将以年轻的主题为主。如果时间窗口配置没有足够的保留期或宽限期,则可能会删除旧主题的事件。有关配置选项,请参阅TimeWindows的JavaDoc:https://github.com/apache/kafka/blob/d5b53ad132d1c1bfcd563ce5015884b6da831777/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindows.java

在您的示例中,只要流时间进展不太快,就可以接受旧数据。重新处理整个数据集应该是可行的,因为它会线性地贯穿主题。如果旧数据在超过窗口大小宽限期的时间窗口中聚合,Kafka Streams将拒绝该记录。在这种情况下,Kafka Streams还将发出错误消息,并相应调整其指标。因此,这种行为应该很容易察觉。

如果可行,我建议尝试这种再处理,并观察日志和指标。

 类似资料:
  • 我们使用流,并将每个消息发布到另一个主题,该主题按用户id对记录进行分区(按用户id重新分区原始流)。 然后我们消耗这个重新分区的流,我们将消耗的记录存储在加窗10分钟的本地状态存储中。一个特定用户的所有点击总是在同一个分区中,但顺序并不保证,因为最初的主题有10个分区。 我理解Kafka流的窗口模型,当新记录进入时,时间会提前,但我需要这个窗口使用处理时间,而不是事件时间,然后当窗口过期时,我需

  • 我们正在应用程序中使用apache kafka streams 0.10.2.0。我们利用kafka streams拓扑将处理后的数据传递到下一个主题,直到处理结束。 此外,我们使用AWS ECS容器来部署消费者应用程序。我们观察到消费者正在拾取非常旧的消息进行处理,尽管它们已经在更早的时候处理过。这个问题在服务扩展/缩减或新部署时随机发生。我知道在消费者重新平衡时,有些消息可以重新处理。但在这种

  • 我有一个关于Kafka流的时间窗的问题,有些概念真的让我困惑。 我们有一个主题每天获得1000万个事件,我们有6天的日志保留,所以总的主题包含6000万个事件。 现在,我创建了一个KTable,我正在执行load all操作并迭代事件。正如我之前提到的,实际上我们只是当前的事件,而不是6千万事件,所以我在KTable定义中窗口化了这些数据。 现在,当我用以下语句加载所有事件时,一切都运行良好。 问

  • 我有一个Kafka Streams应用程序,它从几个主题读取数据,连接数据并将其写入另一个主题。 每小时消耗/产生几百万条记录。每当我关闭一个代理时,应用程序就进入重新平衡状态,在重新平衡多次之后,它开始使用非常旧的消息。 注意:当Kafka Streams应用程序运行良好时,它的消费者滞后几乎为0。但再平衡之后,它的滞后从0到1000万。 这会不会是因为偏移.保留.分钟。 在这方面的任何帮助都将

  • 来自火花流背景-掌握Kafka流。 我有一个简单的Spark流媒体应用程序, 并返回该分钟内每个用户的最新事件 示例事件类似于 我感兴趣的是这将如何在Kafka流中工作,因为似乎每个事件都有一个输出--当我的用例是减少流量时。 从我到目前为止的阅读来看,这似乎不是直接的,您将不得不使用处理器API。 理想情况下,我希望使用DSL而不是处理器API,因为我刚刚开始研究Kafka流,但似乎我必须使用处