当前位置: 首页 > 知识库问答 >
问题:

Kafka使用admin API提交和最后一次偏移

谢俊力
2023-03-14

我正在使用admin client API查询kafka broker,以使用以下代码获取CONSUMER_GROUP的提交偏移量:

Map<TopicPartition, OffsetAndMetadata> offsets =
                admin.listConsumerGroupOffsets(CONSUMER_GROUP)
                        .partitionsToOffsetAndMetadata().get();

上述代码将触发对特殊创建的_consumer_offsets主题的查询,以获取主题的每个分区(consumer_GROUP负责的分区)的提交偏移量。

另一方面,我使用下面的代码检索CONSUMER_GROUP的每个主题分区的最新(结束)偏移量

for(TopicPartition tp: offsets.keySet()) {
requestLatestOffsets.put(tp, OffsetSpec.latest());
}

Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> latestOffsets =
admin.listOffsets(requestLatestOffsets).all().get();

for (Map.Entry<TopicPartition, OffsetAndMetadata> e: offsets.entrySet()) {
long latestOffset = latestOffsets.get(e.getKey()).offset();

我的问题是,已提交和最新的偏移量因此是从两个不同的主题查询/请求的。已提交的偏移量是从__consumer_offsets主题请求的,最新的(结束)偏移量是从CONSUMER_GROUP的实际主题请求的。

(1) 上述关于请求提交和最新补偿的描述是否准确?

②是否可以直接查询__consumer_offsets题目?

谢谢你。

共有1个答案

公冶昆杰
2023-03-14

>

  • 是的,你的理解是正确的。提交的偏移量存储在\uuuu consumer\u offset主题中,而您需要查询特定分区以获取其末端偏移量。

    是的_消费者_补偿是一个常规话题,如果你想直接消费它。通过提供的API检索数据通常更容易,但如果您对其内容感兴趣,可以使用它。如果想了解如何反序列化数据,请查看控制台格式化程序

  •  类似资料:
    • 我目前正在从具有特定偏移量的主题中获取消息。我正在使用寻求()来实现它。但是当我将enable.auto.commit设置为true或使用手动同步(委托同步()/委托同步())时,Seek()不起作用,因为它没有轮询来自特定偏移量的消息,而是从最后提交的偏移量中选择。 因此,在使用Seek()时,是否必须将偏移量存储在外部DB中,而不提交给Kafka?Seek和Commit不能并行工作吗? 客户端

    • Spring-Boot版本--2.2.6发行版 Spring-Kafka-2.3.7发行版 Kafka-客户端-2.3.1 阿帕奇-Kafka-Kafka2.12-2.3.1 我们有10个主题和50个分区,每个主题属于同一组,我们增加主题分区和用户计数在运行时根据负载。 自动提交=false 处理后同步提交每个偏移量 max-poll-records设置为1

    • 我有一个kafkalistener,可以一次监听一批消息,如下所示 我的问题是,有没有一种方法可以监听多批消息并只提交一次。例如,如果我在Kafka主题中有1000条消息,我希望以10批的形式一次听100条消息,并在处理10批消息后提交偏移量。

    • 我有一个ReactorKafka项目,它消耗来自Kafka主题的消息,转换消息,然后写入到另一个主题。 我的理解是,只有在Reactor中成功完成所有顺序步骤后,才会提交偏移量。对吗?我想确保不会处理下一条记录,除非当前记录成功发送到目标Kafka主题。

    • 我有一个用户轮询从订阅的主题。它消耗每条消息并进行一些处理(在几秒内),推送到不同的主题并提交偏移量。 总共有5000条信息, 重新启动前-消耗2900条消息和提交的偏移量 kafka版本(strimzi)>2.0.0 kafka-python==2.0.1