当前位置: 首页 > 知识库问答 >
问题:

为什么一个新的消费者组id不从头开始

赖星驰
2023-03-14
@Grab('org.apache.kafka:kafka-clients:0.10.0.0')

import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.consumer.ConsumerRecords
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.PartitionInfo

Properties props = new Properties()
props.with {
    put("bootstrap.servers","***********:9091")
    put("group.id","script-test-noseek")
    put("enable.auto.commit","true")
    put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer")
    put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer")
    put("session.timeout.ms",30000)
}

KafkaConsumer consumer = new KafkaConsumer(props)
def topicMap = [:]
consumer.listTopics().each { topic, partitioninfo ->
        topicMap[topic] = 0
}

topicMap.each {topic, count ->
    def stopTime = new Date().time + 30_000
    def stop = false

    println "Starting topic: $topic"
    consumer.subscribe([topic])
    //consumer.subscribe([topic], new CRListener(consumer:consumer))
    while(!stop) {
        ConsumerRecords<String, String> records = consumer.poll(5_000)
        topicMap[topic] += records.size()
        consumer.commitAsync()
        if ( new Date().time > stopTime || records.size() == 0) {
            stop = true
        }
    }    
    consumer.unsubscribe()
}

def total = 0
println "------------------- Results -----------------------"
topicMap.each { k,v ->
    if ( v > 0 ) {
        println "Topic: ${k.padRight(64,' ')} Records: ${v}"
    }
    total += v
} 
println "==================================================="
println "Total: ${total}"
def dummy = "Process End"

class CRListener implements ConsumerRebalanceListener {
    KafkaConsumer consumer
    void onPartitionsAssigned(java.util.Collection partitions) {
        consumer.seekToBeginning(partitions)
    }
    void onPartitionsRevoked(java.util.Collection partitions) {
        consumer.commitSync()
    }
}

我看不出我做错了什么。如有任何帮助,不胜感激。

共有1个答案

诸葛文博
2023-03-14

如果您使用一个新的使用者组id,并希望从头阅读整个主题,则需要在属性中指定参数“auto.offset.reset=reasiry”。(默认值为“最新”)

Properties props = new Properties()
props.with {
    // all other values...
    put("auto.offset.reset","earliest")
}

在消费者启动时,会发生以下情况:

  1. 查找(有效的)提交偏移量以便使用group.id
  2. 如果找到(有效)偏移量,请从那里继续
  3. 如果未找到(有效)偏移量,请根据auto.offset.reset
  4. 设置偏移量
 类似资料:
  • 我正在编写一个概念验证应用程序来使用Apache Kafka0.9.0.0中的消息,看看是否可以使用它而不是通用的JMS消息代理,因为Kafka提供了好处。这是我的基本代码,使用新的消费者API: 我使用默认设置启动了一个kafka服务器,并使用shell工具启动了一个kafka生产者,以便将消息写入我的主题。然后,我使用这段代码与两个使用者连接,发送正确的服务器来连接,发送主题来订阅,其他一切都

  • 由于消息需求的排序,我们有一个主题和一个分区。我们有两个消费者运行在不同的服务器上,具有相同的配置集,即groupId、consumerId和consumerGroup。即 1主题- 当我们部署消费者时,相同的代码会部署在两台服务器上。当消息到来时,我们会注意到两个消费者都在消费消息,而不是只有一个处理。让消费者在两台独立的服务器上运行的原因是,如果一台服务器崩溃,至少其他服务器可以继续处理消息。

  • 我在本地机器上安装了Kafka,并启动了zookeeper和一个代理服务器。 现在我有一个单独的主题,描述如下: 我有一个生产者在消费者启动之前产生了一些消息,如下所示: 当我使用--从头开始选项启动消费者时,它不会显示生产者生成的所有消息: 但是,它显示的是新添加的消息。 我在这里怎么了?有什么帮助吗?

  • 我以前认为设置我的消费者将始终收到他们尚未收到的消息,但最近我发现情况并非如此。这只在使用者尚未提交抵消时才起作用。在任何其他情况下,使用者将继续接收偏移大于其提交的最后偏移的消息。 由于我总是使用随机的组ID创建新的使用者,我意识到我的使用者“没有内存”,他们是新的使用者,并且他们永远不会提交偏移,因此策略将始终适用。我的疑虑就从这里开始了。假设以下场景: 我有两个客户端应用程序,A和B,每个客

  • 虽然auto.offset.reset的值是最新的,但使用者从属于2天前的消息开始,然后就会赶上最新的消息。 我错过了什么?

  • 我们启动一个Kafka消费者,监听一个可能还没有创建的主题(不过,主题自动创建是启用的)。 此后不久,一位制作人发表了关于这个话题的消息。 Kafka原木