我有以下代码
class Consumer(val consumer: KafkaConsumer<String, ConsumerRecord<String>>) {
fun run() {
consumer.seekToEnd(emptyList())
val pollDuration = 30 // seconds
while (true) {
val records = consumer.poll(Duration.ofSeconds(pollDuration))
// perform record analysis and commitSync()
}
}
}
}
消费者订阅的主题会不断收到记录。有时,消费者会因处理步骤而崩溃。然后,当使用者重新启动时,我希望它从主题的最新偏移量开始使用(即,忽略在使用者关闭时发布到主题的记录)。我认为seekToEnd()
方法可以确保这一点。然而,这种方法似乎毫无效果。消费者从其崩溃的偏移量开始消费。
什么是正确的方式使用asikToend()
?
编辑:使用以下配置创建消费者
fun <T> buildConsumer(valueDeserializer: String): KafkaConsumer<String, T> {
val props = setupConfig(valueDeserializer)
Common.setupConsumerSecurityProtocol(props)
return createConsumer(props)
}
fun setupConfig(valueDeserializer: String): Properties {
// Configuration setup
val props = Properties()
props[ConsumerConfig.GROUP_ID_CONFIG] = config.applicationId
props[ConsumerConfig.CLIENT_ID_CONFIG] = config.kafka.clientId
props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = config.kafka.bootstrapServers
props[AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG] = config.kafka.schemaRegistryUrl
props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = config.kafka.stringDeserializer
props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = valueDeserializer
props[KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG] = "true"
props[ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG] = config.kafka.maxPollIntervalMs
props[ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG] = config.kafka.sessionTimeoutMs
props[ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG] = "false"
props[ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG] = "false"
props[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "latest"
return props
}
fun <T> createConsumer(props: Properties): KafkaConsumer<String, T> {
val consumer = KafkaConsumer<String, T>(props)
consumer.subscribe(listOf(config.kafka.inputTopic))
return consumer
}
我找到解决办法了!
我需要添加一个虚拟轮询作为消费者初始化过程的一部分。由于有几种Kafka方法是惰性评估的,因此有必要使用虚拟轮询将分区分配给使用者。如果没有虚拟轮询,使用者将尝试查找空分区的末尾。因此,seekToEnd()
无效。
重要的是,虚拟轮询持续时间要足够长,以便分配分区。例如,消费者。轮询((持续时间秒(1))
,在程序转到下一个方法调用(即seekToEnd()
)之前,分区没有得到分配的时间。
工作代码可能是这样的
class Consumer(val consumer: KafkaConsumer<String, ConsumerRecord<String>>) {
fun run() {
// Initialization
val pollDuration = 30 // seconds
consumer.poll((Duration.ofSeconds(pollDuration)) // Dummy poll to get assigned partitions
// Seek to end and commit new offset
consumer.seekToEnd(emptyList())
consumer.commitSync()
while (true) {
val records = consumer.poll(Duration.ofSeconds(pollDuration))
// perform record analysis and commitSync()
}
}
}
}
seekToEnd
方法需要关于实际分区的信息(用Kafka术语TopicPartition
),您计划在该分区上让消费者从头到尾阅读。
我不熟悉静态编程语言API,但检查JavaDocs上的Kafka消费者的方法asikToend,你会看到,它要求一个集合的主题分区。
由于您当前使用的是emptyList()
,所以它不会产生任何影响,就像您观察到的那样。
相反,我需要做的是将更改为新的内容,然后它将从最早的偏移量恢复。 会不会有其他的犯罪行为? 更新 根据我的理解,这看起来像是每次auto commit enable为false时,它都将提交偏移量。这是Camel Kafka组件的一个特性,因为即使启用了自动提交,它也将在x条消息之后同步
我正在使用Kafka2.0版和java消费者API来消费来自一个主题的消息。我们使用的是一个单节点Kafka服务器,每个分区有一个使用者。我注意到消费者正在丢失一些消息。场景是:消费者投票主题。我为每个线程创建了一个消费者。获取消息并将其交给处理程序来处理消息。然后使用“至少一次”的Kafka消费者语义来提交Kafka偏移量来提交偏移量。同时,我有另一个消费者使用不同的group-id运行。在这个
虽然auto.offset.reset的值是最新的,但使用者从属于2天前的消息开始,然后就会赶上最新的消息。 我错过了什么?
我已经编写了一个Java Kafka消费者。我想确定如何明确确保一旦Kafka消费者启动,它只读取从那时起由制作人发送的消息,即它不应读取制作人已发送给Kafka的任何消息。有人能解释一下如何确保这一点吗 这是我使用的属性的片段 更新9月14日: 我使用的是以下属性,似乎消费者有时仍然从一开始就阅读,有人能告诉我现在出了什么问题吗? 我使用Kafka版本0.8.2
如有任何帮助,我们将不胜感激。
为什么实际主题中的偏移值与同一主题中的偏移值不同?PFB偏移位置以及使用的命令。 我错过了什么?