当前位置: 首页 > 知识库问答 >
问题:

如何从Kafka中的旧偏移点获取数据?

戈曾琪
2023-03-14

我使用动物园管理员从Kafka获取数据。在这里,我总是从最后一个偏移点获取数据。有没有办法指定偏移时间来获取旧数据

有一个选项“自动偏移”。重置。它接受最小的或最大的。有人能解释一下什么是最小的和最大的吗。可以自动偏移。重置有助于从旧偏移点而不是最新偏移点获取数据?

共有3个答案

拓拔欣嘉
2023-03-14

请参考关于kafka config:http://kafka.apache.org/08/configuration.html的文档,查询偏移参数的最小值和最大值。

顺便说一句,在探索Kafka时,我想知道如何为消费者重播所有消息。我的意思是,如果一个消费者团体已经调查了所有的信息,并且想重新获得这些信息。

它可以实现的方式是从动物园管理员删除数据。使用kafka.utils.ZkUtils类删除zoowatch上的节点。下面是它的用法:

ZkUtils.maybeDeletePath(${zkhost:zkport}", "/consumers/${group.id}");
徐欣德
2023-03-14

从Kafka文档中,他们说“Kafka.api.OffsetRequest.EarliestTime()在日志中找到数据的开头并从那里开始流式处理,Kafka.api.OffsetRequest.LatestTime()将只流式处理新消息。不要假设偏移量0是开始偏移量,因为消息会随着时间的推移从日志中过时。”

在此处使用SimpleConsumerExample:https://cwiki.apache.org/confluence/display/KAFKA/0.8.0SimpleConsumer示例

类似的问题:Kafka高级消费者使用JavaAPI从主题获取所有消息(相当于--从头开始)

这可能会有帮助

尤夕
2023-03-14

使用者始终属于一个组,对于每个分区,Zookeeper跟踪分区中该使用者组的进度。

要从头开始获取,您可以删除Hussain引用的与进度相关的所有数据

ZkUtils.maybeDeletePath(${zkhost:zkport}", "/consumers/${group.id}");

您还可以指定所需分区的偏移量,如core/src/main/scala/kafka/tools/UpdateOffsetsInZK中所指定的。斯卡拉

ZkUtils.updatePersistentPath(zkClient, topicDirs.consumerOffsetDir + "/" + partition, offset.toString)

然而,偏移量没有时间索引,但您知道每个分区是一个序列。

如果您的消息包含一个时间戳(请注意,这个时间戳与Kafka收到您的消息的时刻无关),您可以尝试使用索引器,尝试通过将偏移量增加N来逐步检索一个条目,并将元组(主题X,第2部分,偏移量100,时间戳)存储在某处。

当您想从指定的时间时刻检索条目时,您可以对粗略索引应用二进制搜索,直到找到您想要的条目并从那里获取。

 类似资料:
  • 我正在使用事务性KafkaProducer向主题发送消息。这个很管用。我使用的是具有read_committed隔离级别的KafkaConsumer,而我的seek和seekToEnd方法存在问题。根据文档,seek和seekToEnd方法给出了LSO(上次稳定偏移量)。但这有点让人摸不着头脑。因为它给我的价值总是一样的,主题结束了。无论最后一个条目是(由生产者提交的)还是中止的事务的一部分。例如

  • 问题内容: 我知道相反。给定一个时区,我可以通过以下代码片段获取时区偏移量: 我想知道如何从时区偏移量获取时区名称。 鉴于 (以毫秒为单位; +6.00偏移) 我想得到以下任何可能的时区名称的结果: 问题答案: 用

  • 问题内容: 有没有一种方法可以获取,样式名称,甚至可以将插入时我给文本的样式在某个位置上甚至与之进行比较?因为我的目的,我创建的自定义,和。因此,我可以选择用于表示常规字母,并用于表示数字的另一种样式。我还具有切换按钮,该按钮在切换时设置为以不同的方式设置数字格式,而在未切换时不定期设置数字格式,因此最后您无法仅根据方法区分哪些数字受到了影响。因此,唯一的方法是比较具有常规和特殊数字样式作为常量的

  • 试图了解发生这种情况时会发生什么。如果我们试图读取的偏移量丢失(我假设是因为kafka GC'ed the offet)和<code>auto.offset。reset=latest我们是否跳过数据到最新偏移量? 避免数据丢失的安全配置是什么?

  • 问题内容: 我正在使用Java 编写使用者。我想保持消息的实时性,因此,如果有太多消息在等待使用,例如1000条或更多,我应该放弃未使用的消息,并从最后一个偏移量开始使用。 对于此问题,我尝试比较主题的最后提交的偏移量和主题的结束偏移量(仅1个分区),如果这两个偏移量之间的差大于某个值,则将主题的最后提交的偏移量设置为下一个偏移量,这样我就可以放弃那些多余的消息。 现在我的问题是如何获得主题的最终

  • 问题内容: 现在,Golang Kafka库(sarama)提供了使用者组功能,而kafka 10没有任何外部库帮助。如何在任何给定时间获得使用者组正在处理的当前消息偏移量? 以前,我使用kazoo-go(https://github.com/wvanbergen/kazoo- go )来获取我的消费者组消息偏移量,因为它存储在Zookeeper中。现在,我使用sarama- cluster(ht