当前位置: 首页 > 知识库问答 >
问题:

Kafka消费者从一开始就不消费

陶修洁
2023-03-14

我在本地机器上安装了Kafka,并启动了zookeeper和一个代理服务器。

现在我有一个单独的主题,描述如下:

~/Documents/backups/kafka_2.12-2.2.0/data/kafka$ kafka-topics.sh --zookeeper 127.0.0.1:2181 --topic edu-topic --describe
Topic:edu-topic PartitionCount:3    ReplicationFactor:1 Configs:
    Topic: edu-topic    Partition: 0    Leader: 0   Replicas: 0 Isr: 0
    Topic: edu-topic    Partition: 1    Leader: 0   Replicas: 0 Isr: 0
    Topic: edu-topic    Partition: 2    Leader: 0   Replicas: 0 Isr: 0

我有一个生产者在消费者启动之前产生了一些消息,如下所示:

~/Documents/backups/kafka_2.12-2.2.0/data/kafka$ kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic edu-topic
>book 
>pen 
>pencil
>marker
>

当我使用--从头开始选项启动消费者时,它不会显示生产者生成的所有消息:

~/Documents/backups/kafka_2.12-2.2.0/data/kafka$ kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic edu-topic --group edu-service --from-beginning

但是,它显示的是新添加的消息。

我在这里怎么了?有什么帮助吗?

共有3个答案

向和歌
2023-03-14

国旗

--from-begining

将影响您的组使用者在首次启动/创建时的行为,或者存储的(上次提交的使用量)偏移量已过期(或者当您尝试重置存储的偏移量时)。

否则,组使用者将仅以存储的(上次提交)的偏移量继续。

请考虑从手册中获取更多信息。

姜智渊
2023-03-14

因为你使用的是旧的消费者群体——从一开始仅适用于尚未记录在Kafka集群上的新消费群。

要从头开始再次消费,您可以:

  • 使用标记--从头开始
  • 启动一个新的消费者组(更改组名)
  • 重置此消费者组的偏移。我还没试过,但你可以在这里测试一下
钱繁
2023-03-14

--from begin:如果使用者还没有确定的消费偏移量,则从日志中出现的最早消息开始,而不是最新消息。

Kafka消费者第一次使用-从头开始如果你重试,我怀疑你做了,它将从它离开的地方开始。您可以使用以下任一选项再次消费该邮件

  1. 使用以下方法重置消费者组偏移

Kafka-streams-application-reset . sh-application-id edu-服务-输入-主题edu-主题-引导-服务器localhost:9092-zookeeper 127 . 0 . 0 . 1:2181

然后从头开始重试

Kafka-console-consumer . sh-bootstrap-server localhost:9092-topic edu-topic-group edu-service-从头开始

kafka控制台消费者。sh--引导服务器localhost:9092--主题edu主题--组新的edu服务--从头开始

Kafka-console-consumer . sh-bootstrap-server localhost:9092-offset 0-partition 0-topic edu-topic

-偏移

 类似资料:
  • 虽然auto.offset.reset的值是最新的,但使用者从属于2天前的消息开始,然后就会赶上最新的消息。 我错过了什么?

  • 我没有使用分区发布到Kafka主题。ProducerRecord(字符串主题、K键、V值)

  • 我在使用Kafka时遇到了一些问题。非常感谢任何帮助!我在docker swell中分别有zookeeper和kafka集群3个节点。您可以在下面看到Kafka代理配置。 我的情况: < li > 20x位制片人不断向Kafka主题传达信息 < li>1x消费者读取和记录消息 < li >终止kafka节点(docker容器停止),因此现在群集有2个Kafka代理节点(第3个节点将自动启动并加入群

  • null 我在这一页上读到以下内容: 使用者从任何单个分区读取,允许您以与消息生成类似的方式扩展消息消耗的吞吐量。 也可以将使用者组织为给定主题的使用者组-组内的每个使用者从唯一分区读取,并且组作为一个整体使用来自整个主题的所有消息。 如果使用者多于分区,则某些使用者将空闲,因为它们没有可从中读取的分区。 如果分区多于使用者,则使用者将从多个分区接收消息。 如果使用者和分区的数量相等,则每个使用者

  • 我要求从主题中读取消息,对它们进行批处理,然后将批处理推送到外部系统。如果批处理因任何原因失败,我需要再次使用同一组消息并重复该过程。因此,对于每个批处理,每个分区的 from 和 to 偏移量都存储在数据库中。为了实现这一点,我通过向读取器分配分区来为每个分区创建一个Kafka使用者,基于先前存储的偏移量,使用者寻求该位置并开始读取。我已关闭自动提交,并且不提交来自使用者的偏移量。对于每个批处理

  • 我试图写一个Kafka消费者从一开始就消费这些信息。我可以从控制台消费者开始使用同样的方法 但是我在JAVA API中找不到相应的属性。 还有一个问题是什么应该是价值。Avro消息的反序列化程序?