当前位置: 首页 > 知识库问答 >
问题:

在Kafka 0.9中,有没有一种方法可以列出一个消费组中所有消费者的偏移量?

柯冯浩
2023-03-14

我正在使用Kafka 0.9新的消费者API。

我让Kafka为消费者负责补偿。我让消费者在多台机器上阅读相同的主题。

我试图找出以下内容:

  • 在消费者组中注册的消费者
  • 每个消费者的偏移量

我以为消费者-群体-消费者关系会存储在ZooKeeper中。我在ZooKeeper中看到了消费者节点,它没有孩子。

通过查看代码,我可以知道的偏移量被写入了kafka,但我不知道它们被写入了哪个主题?

共有3个答案

宰父正真
2023-03-14

如果kafka处理的偏移量没有存储在zookeeper中,它存储在kafka-logs文件夹中的主题调用“__consumer_offsets-#”中。

如果您想了解有关使用者组check bin/Kafka-consumer-groups . sh的更多信息,可以在执行poll()时通过检查KafkaRecords中的offset字段来找出每个使用者的偏移量

希望它有帮助!

滕渝
2023-03-14

正如@hba所提到的,编码/解码的详细信息可以在kafka.coordinator中找到。底部附近的GroupMetadataManager。查找readMessageKey和以下两个方法。基本上,您需要的是一系列呼叫,如

import org.apache.kafka.common.protocol.types.Type;
...
ByteBuffer bb = ByteBuffer.wrap(consumerRecord.key())
short version = bb.getShort();
String group = (String)Type.String.read(bb);
String topic = (String)Type.String.read(bb);
int partition = (int)Type.INT32.read(bb);

好处是org.apache.kafka.common.protocol.types.Type是Javaapi的一部分,独立于Big main Jar。难看的是,上面的代码片段并不完整。每个consumerRecord.key()consumerRecord.value()都有两个版本,其中一个必须模仿上述方法的解码。没什么大不了的,只是有点乏味。

如果您的项目可以依赖于卡拉罐,完整的Kafka罐和Kafka所需的一个或两个额外的罐子,那么您也可以使用组元数据管理器.readMessageKey(bb)和其他两种方法来读取键和值。至少在0.9.0.1中是它的公开版本。

阮炯
2023-03-14

@nautilus指出的__consumer_offsets topic中似乎至少存储了2种类型的键值对。

  1. 组元数据信息
  2. 偏移提交

据我所知,Kafka使用自己的模式和序列化。您可以通过查看< code > Kafka . coordinator . groupmetadatamanager 找到有关这些结构的更多信息:

  • GroupMetadataManager.OFFSET_COMMIT_KEY_SCHEMA
  • GroupMetadataManager.OFFSET_COMMIT_VALUE_SCHEMA_V0
  • GroupMetadataManager.GROUP_METADATA_KEY_SCHEMA
  • GroupMetadataManager.GROUP_METADATA_VALUE_SCHEMA_V0
 类似资料:
  • 例如,分区有1-10的偏移量。我只想从3-8消费。在消耗了第8条消息后,程序应该退出。

  • 我有一个多分区主题,由多个使用者(同一组)使用。我的目标是最大化消费处理,即任何消费者都可以消费来自任何分区的消息。 我知道这看起来是不可能的,因为只有一个消费者可以从一个分区中消费。 有没有可能使用REST代理来实现这一点?例如,轮询所有代理消费者实例。 谢了。

  • 试图理解消费者补偿和消费者群体补偿之间的关系。 下面的堆栈溢出链接提供了对消费群体补偿管理的极好理解<什么决定Kafka消费补偿?现在问题来了, 情节: 我们在一个消费者组组1中有消费者(c1)。 偏移值是否将存储在消费者(c1)和组(group1)两个级别?或者如果消费者属于任何消费者组,偏移量将存储在仅消费者组级别? 如果偏移值将存储在两个级别中,它是否是消费者级别偏移值将覆盖消费者组级别偏移

  • 相反,我需要做的是将更改为新的内容,然后它将从最早的偏移量恢复。 会不会有其他的犯罪行为? 更新 根据我的理解,这看起来像是每次auto commit enable为false时,它都将提交偏移量。这是Camel Kafka组件的一个特性,因为即使启用了自动提交,它也将在x条消息之后同步

  • 当一个组中只有一个消费者,并且认为消费者无法在session.time.out内进行轮询时,将触发重新平衡,但是在这种情况下,组中只有一个消费者,现在假设session.time.out是30秒和消费者民意调查后50秒组协调员将识别消费者后50秒,并允许它提交偏移或协调员将断开消费者和没有偏移得到提交,并将重新平衡消费者与新的消费者标识?如果上次提交的偏移量是345678,在下一次轮询中,它处理了

  • 有以下消费者代码: 然后我用脚本生成消息: 问题是,当我将消费者作为两个不同的进程启动时,我会在每个进程中收到新消息。但是,我希望它只发送给一个消费者,而不是广播。 在Kafka的文献中(https://kafka.apache.org/documentation.html)其中写道: 如果所有使用者实例都具有相同的使用者组,则其工作原理就像在使用者之间平衡负载的传统队列一样。 我发现这些消费者的