当前位置: 首页 > 知识库问答 >
问题:

从特定偏移量到结束偏移量的流处理

洪通
2023-03-14

可以从输入主题的特定偏移量到结束偏移量进行Kafka流处理吗?

我有一个Kafka流应用程序消耗输入主题,但由于某种原因失败了。我修复了问题并再次启动它,但它从输入主题的最新偏移量开始消耗。我知道应用程序已处理的输入主题的偏移量。现在,我如何将输入主题从一个偏移量处理到另一个偏移量。我正在使用合流平台5.1.2。

共有1个答案

谢烨烨
2023-03-14

默认情况下,KStreams支持auto.offset.reset的两个可能值。它可以是“最早”或“最新”。您不能将其设置为应用程序代码中的特定偏移量。

在应用程序重置期间有一个选项。如果您使用应用程序重置脚本,您可以使用--to-偏移属性并将其分配给特定的偏移量。它会将应用程序重置到该点。

<path-to-confluent>/bin/kafka-streams-application-reset --application-id app1 --input-topics a,b --to-offset 1000

您可以在留档中找到详细信息:
https://docs.confluent.io/5.1.2/streams/developer-guide/app-reset-tool.html

在这种情况下,如果您正在修复错误,最好尽可能重置为最早的状态。

 类似资料:
  • 问题内容: 是否可以跳过X个第一行,并在一个查询中选择所有其他行?像那样: 它将选择:pqr,stu,vwx,yz 我尝试使用LIMIT和OFFSET完成此操作,但是问题是表是动态的,而且我不知道应该输入哪个LIMIT(我不知道表中有多少行)。 问题答案: 如果只需要最后N行,请尝试以下操作: 这会根据的顺序为您提供最后几条记录。 您可以使用自动递增的主键(希望有一个主键)来确定行的顺序(如果无法

  • 为什么实际主题中的偏移值与同一主题中的偏移值不同?PFB偏移位置以及使用的命令。 我错过了什么?

  • 我有一个Kafka消费者,我从它消费数据从一个特定的主题,我看到下面的例外。我使用的是Kafka版本。 我添加了这两个额外的消费者属性,但仍然没有帮助: 那个错误意味着什么?我该如何解决它?我需要添加一些其他消费者属性吗?

  • 问题内容: 嘿,我正在尝试打开文件,仅从偏移量读取一定长度!我阅读了以下主题: 如何使用Java中的文件中的特定行号读取特定行? 在那儿,它说在不读取之前就不可能读取某行,但是我想知道字节! 是否可以从已知偏移量读取某些字节? 问题答案: RandomAccessFile提供一个功能:

  • 我用Kafka和spring-布特: Kafka制作人班: Kafka-配置: 问题: 我有一个主题的5个分区,比方说。 发生的情况是,我获得成功(即消息成功发送到Kafka)日志,但是topic的无分区的偏移量增加。 正如您在上面看到的,我添加了日志和。我所期望的是,当Kafka不能发送消息给Kafka时,我应该得到一个错误,但在这种情况下,我没有收到任何错误消息。 Kafka的上述行为以的比例

  • 问题内容: 为什么在多索引DataFrame时不能使用偏移量? 例如,使用: 如果我尝试使用偏移量进行分组和滚动,则会显示“ ValueError:窗口必须为整数 ”: 并不是说以下这些变体可以满足我的需求,但是请注意对作品进行分组和滚动: 我可以在DataFrame的单索引子集上使用偏移量滚动: 如果确实不可能在多索引DataFrame上进行偏移滚动,那么将零应用于每个0级索引项的最有效的解决方