当前位置: 首页 > 知识库问答 >
问题:

为什么Kafka Consumer API不从主题的多个分区获取数据

秦雅逸
2023-03-14

注:使用kafka_2.11-0.9.0.1

我创建了一个Kafka主题,名为:消费者教程,有3个分区,如下所示:

C:\kafka_2.11-0.9.0.1>.\bin\windows\kafka-topics.bat --describe --topic consumer-tutorial
--zookeeper localhost:2181
Topic:consumer-tutorial PartitionCount:3        ReplicationFactor:1     Configs:
        Topic: consumer-tutorial        Partition: 0    Leader: 0       Replicas: 0     Isr: 0
        Topic: consumer-tutorial        Partition: 1    Leader: 0       Replicas: 0     Isr: 0
        Topic: consumer-tutorial        Partition: 2    Leader: 0       Replicas: 0     Isr: 0

创建主题后,我使用Producer API在每个分区中生成了一些数据,如下所示:

KafkaProducer<String,String> prod = new KafkaProducer<String,String>(props); 

for(int i =0; i < 3; i++)
{
    for(int x=start; x<end; x++)
    {
        prod.send(new ProducerRecord<String,String>("consumer-tutorial",i,Integer.toString(x),Integer.toString(rnd.nextInt(100))));     }
    start=end;
    end = start + 10;
}
prod.close();

现在,当我试图使用以下消费者API从本主题消费者教程中获取记录/消费消息时:

KafkaConsumer<String,String> consumer = new KafkaConsumer<String,String>(props);
consumer.subscribe(Arrays.asList("consumer-tutorial"));

while(true)
{
    ConsumerRecords<String, String> records = consumer.poll(100);

    for(ConsumerRecord<String,String> record: records)
    {
        System.out.printf("Key: %s, Value = %s", record.key(), record.value());
    }

}

运行此代码时我没有得到任何记录。我检查了记录变量,但没有KEY: VALUE对来自Poll

看“记录”什么也没显示

谁能帮我解释一下为什么我没有得到任何数据来显示。

NOTE: It works well when I have single partition topic.

共有2个答案

夏昊
2023-03-14
匿名用户

我知道获取记录需要先进行几次迭代,但在我的情况下,循环是无限的,轮询不断。今天晚些时候我发现,当我用一些新名称重置组名(我使用UUID将组名随机化)时,它会使用不同的分区从主题中获取数据。因此得出结论,需要将偏移量重置为开头,以便使用者可以开始从开头提取数据,并保持组id每次都是新的,将偏移量指针保留在开头。现在我想知道如何设置主题开头的偏移量(而不是分区)。。我试着使用“代码”道具。put(ConsumerConfig.AUTO\u OFFSET\u RESET\u CONFIG,“最早”)'和〈code〉seek〈code〉和〈code〉seektobegining〈code〉,但除非消费者失去了偏移轨迹,否则所有这些都不起作用。。有什么方法可以把偏移指针设置到起点吗?

太叔景同
2023-03-14

当你订阅主题时,内部会发生很多这样的操作1。找到组协调器2。发送组请求并获取组长3。领导者将确保任何一个加入组或离开组成员4。领导者将根据分配策略(Range/RoundRobin)将分区分配给成员。5.然后每个消费者成员从组协调员那里获取元数据。

然后它将获取记录。所以在最初的几次迭代中,当你投票时,你不会得到任何数据。

 类似资料:
  • 问题内容: 如何从代码中获取任何kafka主题的分区数。我研究了许多链接,但似乎没有一个起作用。 提及一些: http://grokbase.com/t/kafka/users/148132gdzk/find-topic-partition-count- through-simpleclient-api http://grokbase.com/t/kafka/users/151cv3htga/ge

  • 我有一个带有2个分区的源主题,我正在用同一个应用程序启动2个kafka streams应用程序。id,但不同的接收器主题。 1) 这两个应用程序实例是否会从不同的分区接收数据? 2)如果其中一个应用程序被杀死,另一个实例会自动从两个实例中消耗吗? 3) 我如何证明上述情况?

  • 我需要创建一个消费者,能够从多个主题拉和订单消息相对于时间戳(Kafka消息时间戳) 在本例中,我订阅了“主题A”和“主题B”,并按照时间戳的顺序对消息进行排队 现在,只要所有主题只有一个分区,这很容易用这个伪代码来解决: 当我为每个主题引入多个分区时,问题就出现了。显然,不可能将多个主题按时间顺序排序到一个流中,因为在一个主题中,顺序不能保证,只能在一个分区中,所以新的问题是将多个主题排序到具有

  • 我们的生产Storm集群出现了一个我们无法解决的问题。 在某个时候,似乎kafka spout停止了从一半的主题分区中读取。有40个分区,它只读取其中的20个。在这种情况开始发生的时候,我们找不到我们对Storm星团或Kafka所做的任何改变。 我们更改了使用者组 ID,并将输出配置设置为它仍然只连接到相同的20个分区。我们已经查看了节点

  • 根据阅读kafka connect文档: https://docs . confluent . io/5 . 3 . 3/connect/user guide . html #分布式模式 config . storage . topic =连接-配置 bin/kafka-topics --create --zookeeper localhost:2181 --topic connect-confi

  • 受此启发,如何从DynamoDB获取每个主分区键的最新数据? 我在迪纳摩有一张桌子。它存储帐户统计信息。帐户统计数据可能每天更新几次。因此,表记录可能如下所示: ------------ -------------- ------- ------- |account_id|record_id|视图|星| ------------ -------------- ------- ------- |3|