注:使用kafka_2.11-0.9.0.1
我创建了一个Kafka主题,名为:消费者教程,有3个分区,如下所示:
C:\kafka_2.11-0.9.0.1>.\bin\windows\kafka-topics.bat --describe --topic consumer-tutorial
--zookeeper localhost:2181
Topic:consumer-tutorial PartitionCount:3 ReplicationFactor:1 Configs:
Topic: consumer-tutorial Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: consumer-tutorial Partition: 1 Leader: 0 Replicas: 0 Isr: 0
Topic: consumer-tutorial Partition: 2 Leader: 0 Replicas: 0 Isr: 0
创建主题后,我使用Producer API在每个分区中生成了一些数据,如下所示:
KafkaProducer<String,String> prod = new KafkaProducer<String,String>(props);
for(int i =0; i < 3; i++)
{
for(int x=start; x<end; x++)
{
prod.send(new ProducerRecord<String,String>("consumer-tutorial",i,Integer.toString(x),Integer.toString(rnd.nextInt(100)))); }
start=end;
end = start + 10;
}
prod.close();
现在,当我试图使用以下消费者API从本主题消费者教程中获取记录/消费消息时:
KafkaConsumer<String,String> consumer = new KafkaConsumer<String,String>(props);
consumer.subscribe(Arrays.asList("consumer-tutorial"));
while(true)
{
ConsumerRecords<String, String> records = consumer.poll(100);
for(ConsumerRecord<String,String> record: records)
{
System.out.printf("Key: %s, Value = %s", record.key(), record.value());
}
}
运行此代码时我没有得到任何记录。我检查了
记录
变量,但没有KEY: VALUE
对来自Poll
看“记录”什么也没显示
谁能帮我解释一下为什么我没有得到任何数据来显示。
NOTE: It works well when I have single partition topic.
我知道获取记录需要先进行几次迭代,但在我的情况下,循环是无限的,轮询不断。今天晚些时候我发现,当我用一些新名称重置组名(我使用UUID将组名随机化)时,它会使用不同的分区从主题中获取数据。因此得出结论,需要将偏移量重置为开头,以便使用者可以开始从开头提取数据,并保持组id每次都是新的,将偏移量指针保留在开头。现在我想知道如何设置主题开头的偏移量(而不是分区)。。我试着使用“代码”道具。put(ConsumerConfig.AUTO\u OFFSET\u RESET\u CONFIG,“最早”)'和〈code〉seek〈code〉和〈code〉seektobegining〈code〉,但除非消费者失去了偏移轨迹,否则所有这些都不起作用。。有什么方法可以把偏移指针设置到起点吗?
当你订阅主题时,内部会发生很多这样的操作1。找到组协调器2。发送组请求并获取组长3。领导者将确保任何一个加入组或离开组成员4。领导者将根据分配策略(Range/RoundRobin)将分区分配给成员。5.然后每个消费者成员从组协调员那里获取元数据。
然后它将获取记录。所以在最初的几次迭代中,当你投票时,你不会得到任何数据。
问题内容: 如何从代码中获取任何kafka主题的分区数。我研究了许多链接,但似乎没有一个起作用。 提及一些: http://grokbase.com/t/kafka/users/148132gdzk/find-topic-partition-count- through-simpleclient-api http://grokbase.com/t/kafka/users/151cv3htga/ge
我有一个带有2个分区的源主题,我正在用同一个应用程序启动2个kafka streams应用程序。id,但不同的接收器主题。 1) 这两个应用程序实例是否会从不同的分区接收数据? 2)如果其中一个应用程序被杀死,另一个实例会自动从两个实例中消耗吗? 3) 我如何证明上述情况?
我需要创建一个消费者,能够从多个主题拉和订单消息相对于时间戳(Kafka消息时间戳) 在本例中,我订阅了“主题A”和“主题B”,并按照时间戳的顺序对消息进行排队 现在,只要所有主题只有一个分区,这很容易用这个伪代码来解决: 当我为每个主题引入多个分区时,问题就出现了。显然,不可能将多个主题按时间顺序排序到一个流中,因为在一个主题中,顺序不能保证,只能在一个分区中,所以新的问题是将多个主题排序到具有
我们的生产Storm集群出现了一个我们无法解决的问题。 在某个时候,似乎kafka spout停止了从一半的主题分区中读取。有40个分区,它只读取其中的20个。在这种情况开始发生的时候,我们找不到我们对Storm星团或Kafka所做的任何改变。 我们更改了使用者组 ID,并将输出配置设置为它仍然只连接到相同的20个分区。我们已经查看了节点
根据阅读kafka connect文档: https://docs . confluent . io/5 . 3 . 3/connect/user guide . html #分布式模式 config . storage . topic =连接-配置 bin/kafka-topics --create --zookeeper localhost:2181 --topic connect-confi
受此启发,如何从DynamoDB获取每个主分区键的最新数据? 我在迪纳摩有一张桌子。它存储帐户统计信息。帐户统计数据可能每天更新几次。因此,表记录可能如下所示: ------------ -------------- ------- ------- |account_id|record_id|视图|星| ------------ -------------- ------- ------- |3|