当前位置: 首页 > 知识库问答 >
问题:

为什么我的消费者每次都在阅读该主题的所有消息,即使auto.offset.reset=最大?

邰建业
2023-03-14

我在topic1上向Kafka发送了5条消息并成功消费了它们。当我发送第6条消息并尝试消费时,我再次收到所有6条消息,而不是最新的(第6条)消息。

请注意,我正在运行消费者命令行,而不是从数据库连接器(访问模块)。连接器的配置属性auto.offset.reset设置为“最大”。(请查看下面日志中的所有配置属性)

另请参见下面的OffsetChecker输出:

bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker \
    --group testjob --zookeeper localhost:2181 --topic topic1

[2017-07-06 21:57:46,707] WARN WARNING: ConsumerOffsetChecker is deprecated and will be dropped in releases following 0.9.0. Use ConsumerGroupCommand instead. (kafka.tools.ConsumerOffsetChecker$)
Exiting due to: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /consumers/testjob/offsets/topic1/0.

任何人都可以让我知道问题出在哪里?

以下是显示配置属性的日志:

***Global config Properties***
*             client.id = rdkafka
*             message.max.bytes = 1200
*             receive.message.max.bytes = 100000000
*             metadata.request.timeout.ms = 60000
*             topic.metadata.refresh.interval.ms = 600000
*             topic.metadata.refresh.fast.cnt = 10
*             topic.metadata.refresh.fast.interval.ms = 250
*             topic.metadata.refresh.sparse = false
*             socket.timeout.ms = 60000
*             socket.send.buffer.bytes = 0
*             socket.receive.buffer.bytes = 0
*             socket.keepalive.enable = false
*             socket.max.fails = 3
*             broker.address.ttl = 300000
*             broker.address.family = any
*             statistics.interval.ms = 0
*             log_cb = 0x7fecb80c6dd0
*             log_level = 6
*             socket_cb = 0x7fecb80cd2f0
*             open_cb = 0x7fecb80ddd30
*             opaque = 0x2641280
*             internal.termination.signal = 0
*             queued.min.messages = 100000
*             queued.max.messages.kbytes = 1000000
*             fetch.wait.max.ms = 100
*             fetch.message.max.bytes = 1049776
*             fetch.min.bytes = 1
*             fetch.error.backoff.ms = 500
*             group.id = testjob
*             queue.buffering.max.messages = 100000
*             queue.buffering.max.ms = 1000
*             message.send.max.retries = 2
*             retry.backoff.ms = 100
*             compression.codec = none
*             batch.num.messages = 1000
*             delivery.report.only.error = false
*             request.required.acks = 1
*             enforce.isr.cnt = 0
*             request.timeout.ms = 5000
*             message.timeout.ms = 300000
*             produce.offset.report = false
*             auto.commit.enable = true
*             auto.commit.interval.ms = 60000
*             auto.offset.reset = largest    <<<<--------
*             offset.store.path = .
*             offset.store.sync.interval.ms = 0
*             offset.store.method = file
*             consume.callback.max.messages = 0

共有1个答案

傅乐湛
2023-03-14

添加此属性AUTO_OFFSET_RESET_CONFIG=“最早”它将工作

 类似资料:
  • 我使用confluent .net客户端。订阅者在重启(订阅者服务重启)后始终读取 Kafka 主题的所有消息。如何提交消费者已经实现的偏移并从中读取?也许一些消费者配置可以提供帮助...

  • 我有一个简单的java制作人,如下所示 我正在尝试读取如下数据 但消费者并没有从Kafka那里读到任何信息。如果我在处添加以下内容 然后消费者开始从题目开始阅读。但是每次消费者重新启动时,它都从我不想要的主题开始读取消息。如果我在启动消费程序时添加了以下配置 然后,它从主题中读取消息,但是如果消费者在处理所有消息之前重新启动,那么它不会读取未处理的消息。 有人可以让我知道出了什么问题,我该如何解决

  • 我有以下用例: 我有两个Kafka主题,一个是用来处理传入消息流的,另一个是用来存储记录的,作为应用程序初始状态的引导。 有没有办法做到以下几点: 当应用程序启动时,读取Kafka主题中的所有消息,并将该主题中用于将应用程序引导至初始状态的所有存储在内存中 只有在读取了所有消息后,才允许处理流主题中的 因为在应用程序运行时,状态主题上可能会有其他记录,以便在不必重新启动应用程序的情况下将它们合并到

  • 为了使用Kafka通用地发布消息,我使用类名作为主题: 服务器属性(我从默认属性中唯一更改的内容): 注意:我还尝试了以下用户设置:

  • 我试图消费一个Kafka主题从Spring启动应用程序。我使用的是下面提到的版本的Spring云流 Spring boot starter父级:2.5.7 Spring云版本:2020.0.4 下面是代码和配置 application.yml 消息消费者类 下面的消息发布者正在正确地发布消息。发布者是在不同的微服务中编写的。 pom.xml

  • 我正在用java编写一个简单的Kafka使用者,它被配置为读取多个主题。目前,让我们假设两个主题(topic1和Topic2),并为两个主题设置一个分区。 Kafka用户从topic1和Topic2读取的顺序是什么。如果这两个主题都有,假设已经发布了100条消息。 使用者首先从topic1读取所有消息,然后再从topic2读取? 用户按时间顺序阅读,将来自两个主题的消息混合在一起? 我看了Kafk