当前位置: 首页 > 知识库问答 >
问题:

如何让Kafka的消费者从最后消费的偏移量开始阅读而不是从头开始

钮高朗
2023-03-14

我是 kafka 的新手,并试图了解是否有办法从上次使用的偏移量读取消息,但不是从头开始。

我正在写一个例子,这样我的意图就不会偏离。

Eg:
1) I produced 5 messages at 7:00 PM and console consumer consumed those.
2) I stopped consumer at 7:10 PM
3) I produced 10 message at 7:20 PM. No consumer had read those messages.
4) Now, i have started console consumer at 7:30 PM, without from-beginning.
5) Now, it Will read the messages produced after it has started. Not the earlier ones, which were produced at 7.20 PM

有没有一种方法可以获取从上次使用的偏移量生成的消息。?

共有3个答案

东门晟
2023-03-14

您应该将使用者配置中的 auto.offset.reset 参数设置为 max,以便它将读取上次提交偏移量后的所有消息。

孔波
2023-03-14

在使用者配置中设置 auto.offset.reset=最早,以及固定的 group.id=something 将在上次提交的偏移量处启动使用者。在您的情况下,它应该在 7:20 的第一条消息开始消费。如果您希望它在启动后开始读取发布的消息,则 auto.offset.reset=latest 将忽略在 7:20 发送的 10 条消息,并读取启动后传入的任何消息。

如果您希望它从一开始就开始,您必须在第一个consumer.poll()之后调用consumer.poll,或者将消费者组ID更改为唯一的。

严兴旺
2023-03-14

我是 kafka 的新手,并试图了解是否有办法从上次使用的偏移量读取消息,但不是从头开始。

是的,可以使用控制台消费者从上次使用的偏移量读取。您必须在调用kafka-convole-消费者时添加consumer.config标志。

例:-

[root@sandbox bin]# ./kafka-console-consumer.sh --topic test1 --zookeeper localhost:2181 --consumer.config /home/mrnakumar/consumer.properties

这里的/home/mrnakumar/consumer . properties是一个包含group.id的文件

group.id=consoleGroup

不使用消费者。config,可以从开头[通过使用--from-begin]或仅从日志结尾读取。日志结束意味着消费者启动后发布的所有消息。

 类似资料:
  • 我在本地机器上安装了Kafka,并启动了zookeeper和一个代理服务器。 现在我有一个单独的主题,描述如下: 我有一个生产者在消费者启动之前产生了一些消息,如下所示: 当我使用--从头开始选项启动消费者时,它不会显示生产者生成的所有消息: 但是,它显示的是新添加的消息。 我在这里怎么了?有什么帮助吗?

  • 虽然auto.offset.reset的值是最新的,但使用者从属于2天前的消息开始,然后就会赶上最新的消息。 我错过了什么?

  • 相反,我需要做的是将更改为新的内容,然后它将从最早的偏移量恢复。 会不会有其他的犯罪行为? 更新 根据我的理解,这看起来像是每次auto commit enable为false时,它都将提交偏移量。这是Camel Kafka组件的一个特性,因为即使启用了自动提交,它也将在x条消息之后同步

  • 我有以下代码 消费者订阅的主题会不断收到记录。有时,消费者会因处理步骤而崩溃。然后,当使用者重新启动时,我希望它从主题的最新偏移量开始使用(即,忽略在使用者关闭时发布到主题的记录)。我认为方法可以确保这一点。然而,这种方法似乎毫无效果。消费者从其崩溃的偏移量开始消费。 什么是正确的方式使用? 编辑:使用以下配置创建消费者

  • 如有任何帮助,我们将不胜感激。

  • 我已经编写了一个Java Kafka消费者。我想确定如何明确确保一旦Kafka消费者启动,它只读取从那时起由制作人发送的消息,即它不应读取制作人已发送给Kafka的任何消息。有人能解释一下如何确保这一点吗 这是我使用的属性的片段 更新9月14日: 我使用的是以下属性,似乎消费者有时仍然从一开始就阅读,有人能告诉我现在出了什么问题吗? 我使用Kafka版本0.8.2