当前位置: 首页 > 知识库问答 >
问题:

Kafka0.9:从最早的Kafka偏移量消费

宇文和昶
2023-03-14
enable.auto.commit = false
auto.offset.reset = earliest

相反,我需要做的是将group.id更改为新的内容,然后它将从最早的偏移量恢复。

会不会有其他的犯罪行为?

更新

if (endpoint.isAutoCommitEnable() != null && !endpoint.isAutoCommitEnable()) {
    if (processed >= endpoint.getBatchSize()) {
        consumer.commitSync();
        processed = 0;
    }
}

根据我的理解,这看起来像是每次auto commit enable为false时,它都将提交偏移量。这是Camel Kafka组件的一个特性,因为即使启用了自动提交,它也将在x条消息之后同步

共有1个答案

夏嘉德
2023-03-14

你的理解听起来是正确的。

Kafka0.9有“旧”和“新”消费者配置。此配置属性在它们之间发生了更改。

auto.commit.enable = false
enable.auto.commit = false

https://kafka.apache.org/documentation#ConsumerConfigs

2016-10-06 14:19:41,725 INFO [org.apache.kafka.clients.consumer.ConsumerConfig:165] - ConsumerConfig values:
    group.id = service
    bootstrap.servers = [kafka:9092]
    enable.auto.commit = false
    auto.offset.reset = latest
 类似资料:
  • 我已经编写了一个Java Kafka消费者。我想确定如何明确确保一旦Kafka消费者启动,它只读取从那时起由制作人发送的消息,即它不应读取制作人已发送给Kafka的任何消息。有人能解释一下如何确保这一点吗 这是我使用的属性的片段 更新9月14日: 我使用的是以下属性,似乎消费者有时仍然从一开始就阅读,有人能告诉我现在出了什么问题吗? 我使用Kafka版本0.8.2

  • 如上所述,我目前正在设置一个Kafka Connect Sink,将数据从Kafka传输到Google云存储中。 然而,一切都进展顺利——它只使用最新的可用偏移量。也就是说,一旦它开始运行,它只将新产生的消息下沉到GCS,而不是来自Kafka的已经存在的消息。我已经尝试删除kafka连接存储/偏移主题,创建一个新的连接器名称等。但是,它总是从最新的偏移量开始。 如果无论如何要为Kafka Conn

  • 问题描述: 我们的Kafka consumer(在Spring Boot2.x中开发)正在执行几天。当我们重新启动这些消费者时,主题的所有消息都将被再次消费,但仅在特定条件下。 条件: 代理配置: 谢谢和问候

  • 如有任何帮助,我们将不胜感激。

  • 我正在使用Kafka2.0版和java消费者API来消费来自一个主题的消息。我们使用的是一个单节点Kafka服务器,每个分区有一个使用者。我注意到消费者正在丢失一些消息。场景是:消费者投票主题。我为每个线程创建了一个消费者。获取消息并将其交给处理程序来处理消息。然后使用“至少一次”的Kafka消费者语义来提交Kafka偏移量来提交偏移量。同时,我有另一个消费者使用不同的group-id运行。在这个

  • 为什么实际主题中的偏移值与同一主题中的偏移值不同?PFB偏移位置以及使用的命令。 我错过了什么?