我正在使用admin client API查询kafka broker,以使用以下代码获取CONSUMER_GROUP的提交偏移量:
Map<TopicPartition, OffsetAndMetadata> offsets =
admin.listConsumerGroupOffsets(CONSUMER_GROUP)
.partitionsToOffsetAndMetadata().get();
上述代码将触发对特殊创建的_consumer_offsets主题的查询,以获取主题的每个分区(consumer_GROUP负责的分区)的提交偏移量。
另一方面,我使用下面的代码检索CONSUMER_GROUP的每个主题分区的最新(结束)偏移量
for(TopicPartition tp: offsets.keySet()) {
requestLatestOffsets.put(tp, OffsetSpec.latest());
}
Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> latestOffsets =
admin.listOffsets(requestLatestOffsets).all().get();
for (Map.Entry<TopicPartition, OffsetAndMetadata> e: offsets.entrySet()) {
long latestOffset = latestOffsets.get(e.getKey()).offset();
我的问题是,已提交和最新的偏移量因此是从两个不同的主题查询/请求的。已提交的偏移量是从__consumer_offsets主题请求的,最新的(结束)偏移量是从CONSUMER_GROUP的实际主题请求的。
(1) 上述关于请求提交和最新补偿的描述是否准确?
②是否可以直接查询__consumer_offsets题目?
谢谢你。
>
是的,你的理解是正确的。提交的偏移量存储在\uuuu consumer\u offset
主题中,而您需要查询特定分区以获取其末端偏移量。
是的_消费者_补偿
是一个常规话题,如果你想直接消费它。通过提供的API检索数据通常更容易,但如果您对其内容感兴趣,可以使用它。如果想了解如何反序列化数据,请查看控制台格式化程序。
我目前正在从具有特定偏移量的主题中获取消息。我正在使用寻求()来实现它。但是当我将enable.auto.commit设置为true或使用手动同步(委托同步()/委托同步())时,Seek()不起作用,因为它没有轮询来自特定偏移量的消息,而是从最后提交的偏移量中选择。 因此,在使用Seek()时,是否必须将偏移量存储在外部DB中,而不提交给Kafka?Seek和Commit不能并行工作吗? 客户端
Spring-Boot版本--2.2.6发行版 Spring-Kafka-2.3.7发行版 Kafka-客户端-2.3.1 阿帕奇-Kafka-Kafka2.12-2.3.1 我们有10个主题和50个分区,每个主题属于同一组,我们增加主题分区和用户计数在运行时根据负载。 自动提交=false 处理后同步提交每个偏移量 max-poll-records设置为1
我有一个kafkalistener,可以一次监听一批消息,如下所示 我的问题是,有没有一种方法可以监听多批消息并只提交一次。例如,如果我在Kafka主题中有1000条消息,我希望以10批的形式一次听100条消息,并在处理10批消息后提交偏移量。
我有一个ReactorKafka项目,它消耗来自Kafka主题的消息,转换消息,然后写入到另一个主题。 我的理解是,只有在Reactor中成功完成所有顺序步骤后,才会提交偏移量。对吗?我想确保不会处理下一条记录,除非当前记录成功发送到目标Kafka主题。
我有一个用户轮询从订阅的主题。它消耗每条消息并进行一些处理(在几秒内),推送到不同的主题并提交偏移量。 总共有5000条信息, 重新启动前-消耗2900条消息和提交的偏移量 kafka版本(strimzi)>2.0.0 kafka-python==2.0.1