enable.auto.commit = false
auto.offset.reset = earliest
相反,我需要做的是将group.id
更改为新的内容,然后它将从最早的偏移量恢复。
会不会有其他的犯罪行为?
更新
if (endpoint.isAutoCommitEnable() != null && !endpoint.isAutoCommitEnable()) {
if (processed >= endpoint.getBatchSize()) {
consumer.commitSync();
processed = 0;
}
}
根据我的理解,这看起来像是每次auto commit enable为false时,它都将提交偏移量。这是Camel Kafka组件的一个特性,因为即使启用了自动提交,它也将在x条消息之后同步
你的理解听起来是正确的。
Kafka0.9有“旧”和“新”消费者配置。此配置属性在它们之间发生了更改。
auto.commit.enable = false
enable.auto.commit = false
https://kafka.apache.org/documentation#ConsumerConfigs
2016-10-06 14:19:41,725 INFO [org.apache.kafka.clients.consumer.ConsumerConfig:165] - ConsumerConfig values:
group.id = service
bootstrap.servers = [kafka:9092]
enable.auto.commit = false
auto.offset.reset = latest
我已经编写了一个Java Kafka消费者。我想确定如何明确确保一旦Kafka消费者启动,它只读取从那时起由制作人发送的消息,即它不应读取制作人已发送给Kafka的任何消息。有人能解释一下如何确保这一点吗 这是我使用的属性的片段 更新9月14日: 我使用的是以下属性,似乎消费者有时仍然从一开始就阅读,有人能告诉我现在出了什么问题吗? 我使用Kafka版本0.8.2
如上所述,我目前正在设置一个Kafka Connect Sink,将数据从Kafka传输到Google云存储中。 然而,一切都进展顺利——它只使用最新的可用偏移量。也就是说,一旦它开始运行,它只将新产生的消息下沉到GCS,而不是来自Kafka的已经存在的消息。我已经尝试删除kafka连接存储/偏移主题,创建一个新的连接器名称等。但是,它总是从最新的偏移量开始。 如果无论如何要为Kafka Conn
问题描述: 我们的Kafka consumer(在Spring Boot2.x中开发)正在执行几天。当我们重新启动这些消费者时,主题的所有消息都将被再次消费,但仅在特定条件下。 条件: 代理配置: 谢谢和问候
如有任何帮助,我们将不胜感激。
我正在使用Kafka2.0版和java消费者API来消费来自一个主题的消息。我们使用的是一个单节点Kafka服务器,每个分区有一个使用者。我注意到消费者正在丢失一些消息。场景是:消费者投票主题。我为每个线程创建了一个消费者。获取消息并将其交给处理程序来处理消息。然后使用“至少一次”的Kafka消费者语义来提交Kafka偏移量来提交偏移量。同时,我有另一个消费者使用不同的group-id运行。在这个
为什么实际主题中的偏移值与同一主题中的偏移值不同?PFB偏移位置以及使用的命令。 我错过了什么?