我们有一个代码来获取kafka主题的消费者的一些细节。下面的代码显示了如何获得分区和相应的偏移量。我们需要的缺失信息是客户组中分区的客户id/客户。我们有办法得到每个主题分区的消费者吗?
ArrayList<TopicPartition> partitions = new ArrayList<TopicPartition>();
ArrayList<OffsetAndMetadata> offsets = new ArrayList<OffsetAndMetadata>();
for (int i=0;i<consumer.partitionsFor(topic).size();i++)
{
TopicPartition partitiontemp = new TopicPartition(topic, i);
partitions.add(partitiontemp);
OffsetAndMetadata offsettemp = consumer.committed(partitiontemp);
offsets.add(offsettemp);
}
consumer.assign(partitions);
consumer.seekToEnd(partitions);
for (int i=0;i<consumer.partitionsFor(topic).size();i++)
{
try {
long cur_offset = offsets.get(partitions.get(i).partition()).offset();
long log_offset = consumer.position(partitions.get(partitions.get(i).partition()));
System.out.printf("Topic: %s partitionID: %d current offset: %d log offset: %d uncommitted: %d\n",
topic, partitions.get(i).partition(),cur_offset , log_offset , log_offset - cur_offset);
}catch (Exception ex){
System.out.printf("Topic: %s partitionID: %d current offset: - log offset: - uncommitted: -\n", topic, partitions.get(i).partition());
}
}
您可能是指消费者 group.id
属性,因为偏移量是每个组,也不是每个 client.id
。此讨论 stackoverflow.com/questions/55937806/apache-kafka-get-list-of-consumers-on-a-specific-topic/55938325
可以回答你的问题。
我有一个带有15个分区的kafka主题[0-14],我正在运行带有5个并行的flink。因此,理想情况下,每个并行flink使用者应该分别使用3个分区。但即使在多次重启之后,很少有Kafka分区不被任何flink工人订阅。 注意:如果我以1个并行度开始作业,则作业工作非常好。 Flink版本:1.3.3
我知道kafka将一个主题的数据安排在许多分区上,一个消费者组中的消费者被分配到不同的分区,从那里他们可以接收数据: 我的问题是: 术语,它们是由主机/IP标识的,还是由客户端连接标识的? 换句话说,如果我启动两个线程或进程,使用相同的消费者组运行相同的Kafka客户端代码,它们被认为是一个消费者还是两个消费者?
Kafka-来自同一组的多个使用者分配了相同的分区 我刚刚开始学习Kafka和诺德。我已经写了一篇关于消费者的文章如下 输出 有四个分区。 编辑 我使用了,如下所示。 生产者正在发送100条消息,收到的消息如下。这就是我如何知道分配的分区(不是从对象)。 当我运行两个这样的使用者实例(相同的主题和组)时,其中只有一个接收来自分区0的所有内容。这不是问题吗? 这是生产商代码。
我正在实现一个自定义消费者的主题/分区分配在Kafka。为此,我将重写抽象类,该类又实现接口。 作为自定义赋值器的一部分,我希望发送一个关于消费者订阅的每个主题的每个分区的单个(浮动)信息。 我知道可以通过重写接口的默认方法向赋值器发送自定义数据。 但是,问题是,从上面的方法签名中,我无法获得为使用者注册的每个主题分配给带下划线使用者的分区列表。 谢谢你。
我们希望在读取消息表单kafka时实现并行性。因此我们想在flinkkafkaconsumer中指定分区号。它将从kafka中的所有分区读取消息,而不是特定的分区号。以下是示例代码: 请建议任何更好的选择来获得并行性。
我有10个消费者和10个分区。我取分区数 并且使用相同的group.id创建相同数量的消费者。 我也发现很少这样的日志->