当前位置: 首页 > 知识库问答 >
问题:

Kafka流:如何在申请再处理之前获得偏移限制以及如何停止

郑和泰
2023-03-14

我正在寻找一种在KafkaStreams中做一个重新处理工具的方法,这将允许在一个主题中从一开始就重新处理数据(应用一些过滤器并将那些事件的更新版本写入同一个主题)。同时,有一个长期运行的应用程序处理来自该主题的数据。

为了只重新处理到应用程序启动并在此之后停止的时间点,需要知道何时停止,这是在该点上最新产生的偏移量。例如。可以在启动具有(分区->偏移量)的拓扑之前构造一个映射来了解这些限制,因此应用程序将能够在达到该偏移量时停止,将当前分区和偏移量(通过处理器API)与初始映射上的偏移量限制进行比较。

从Kafka流中访问最新的偏移信息是否可能/是否有意义?有没有别的办法来解决这个问题?(我想你可以通过Kafka的普通消费者来创造它,寻找目标和定位,但我想问的是在KafkaStreams中是否有一个集成的解决方案)。

此外,如何仅在所有分区都达到其偏移量时才温和地停止应用程序,知道这些信息是分布的,因此您需要了解所有实例的状态?

Kafka/Kafkastreams 2.1、Scala 2.12

共有1个答案

轩辕瑞
2023-03-14

使用消费者来获得末端偏移似乎是合理的。为了停止应用程序,您需要构建一个跟踪进度的手动解决方案。例如,使用TransformValues()可以检查输入记录的主题名称、分区和偏移量(使用通过init()方法提供的Context对象)。这应该允许您在处理所有数据时调用kafkastreams#close()

您可能对这个KIP(在活动atm中)感兴趣,它讨论了类似的思想:https://cwiki.apache.org/confluence/display/kafka/KIP-95%3a+增量+批处理+处理+for+kafka+流

 类似资料:
  • 1)手机端/电脑端:当有新人申请时,系统通知会推送申请,选择同意或拒绝。 2)企业管理-首页-处理新人申请或人员管理-人事管理-新人申请

  • 在我的网站中,用户A创建了一个从上午9:00开始的事件。我将该时间转换为GMT,这样,如果用户B在两小时前的不同时区看到该事件,它可以转换为用户B的时区,并显示该事件在11:00AM开始。 我唯一的想法是,当我得到如下所示的时区偏移量时,我是否应该总是将日期设置为一月,以便无论实际月份如何,我总是得到相同的时区偏移量?或者有没有更好的方法来处理这件事?

  • 我在Kafka·吉拉也描述了这个问题:https://issues.apache.org/jira/browse/KAFKA-13014 我们有多个实例和线程的Kafka流。 这个Kafka流消耗了很多话题。 其中一个主题分区一天内无法访问,主题保留时间为4小时。 解决问题后,Kafka流正试图从不再存在的偏移量中消费: Kafka消费群体描述: 我们可以看到KS正在等待的当前偏移量是 Kafka

  • 问题内容: 使用以下html,当我将鼠标悬停在孩子身上时,我的父母得到绿色背景。我该如何阻止这种情况的发生?如果我将鼠标悬停在子元素之外,我确实希望使用绿色背景。 CSS3很好。 问题答案: 因此,这确实很丑陋,但是(确实)有效。我基本上是在创建父母的副本作为孩子的兄弟姐妹。默认情况下,parent- overwrite是隐藏的,然后显示在child的悬停上。除非您使用+选择器而不是〜选择器,否则

  • 我正在尝试调用我的类来检查存储权限 但问题是,它不让我在一个类中实现它,我得到了以下错误- "类型权限的方法onRequest estPermissionsResult(int, String[], int[])必须覆盖或实现超类型方法" 类型权限的方法requestPermissions(字符串[],int)未定义 这是我的密码- }