注:使用的Kafka版本:Kafka2.11-0.9.0.1
我有一个名为测试Kafka的主题,它是三个分区和一个复制因子,这个主题在每个分区中都有一些字符串数据,即键和值对。
当我试图通过消费者api获取记录时,消费者。轮询函数不获取任何记录并给出记录。count=0,但是,当我使用相同的逻辑时,只有一个单分区,那么它工作良好并获取记录。
我以为Kafka会在内部处理多个分区,并从每个分区中逐个获取记录,我只需要订阅一个主题。
有人能帮忙吗?
参考代码段:
KafkaConsumer<String,String> consumer = new KafkaConsumer<String,String>(props);
consumer.subscribe(Arrays.asList(topic));
while(true)
{
ConsumerRecords<String, String> records = consumer.poll(100);
System.out.println(records.count());
for(ConsumerRecord<String,String> record: records)
{
System.out.printf("Key: %s, Value = %s", record.key(), record.value());
}
}
上述代码在记录计数中返回“零”。
尝试使用更长的轮询间隔。我尝试了100毫秒的主题,但在检索到任何消息之前,通话超时。改成1000毫秒对我来说很管用。
假设我有一个名为“MyTopic”的主题,它有3个分区P0、P1和P2。这些分区中的每一个都有一个leader,并且本主题的数据(消息)分布在这些分区中。 1.Producer将始终根据代理上的负载以循环方式写到分区的领导者。对吗? 2.制作人如何认识隔断的首领?
我们有一个服务器,负责处理消息的生成和消费。我们有4台笔记本电脑,所有带有confluent的Mac都运行相同的命令行。。。 /kafka avro控制台使用者--从一开始--引导服务器0.0.0.0:9092,0.0.0.0:9092--主题主题名称--属性schema.registry.url=http://0.0.0.0:8081 4台笔记本电脑中有3台没有问题使用这些消息,但是第四台不会。
有一个16个分区的Kafka主题 使用给定的消费者组名称,我们目前正在启动单个消费者来阅读该主题。 > 单个消费者是否从该主题的(仅)读取?如果带有消息为空,消费者是否从下一个分区开始读取(...等等)? 我们可以选择启动多个消费者(使用相同的消费者组名称)来读取相同的主题(有16个分区)。为了并行读取多个分区,可以维护多少消费者?
我们正在使用Kafka流将数据写入接收器主题。我正在运行一个avro消费者命令行来检查接收器主题中是否有数据: bin/kafka-avro控制台-消费者-主题sink.output.topic-从开始-新消费者-引导-服务器 当我在kafka streams应用程序运行时同时运行消费者时,我会看到数据,但如果我停止消费者并在几分钟后再次运行,我不会看到任何数据。几乎没有可能: 1) 这是因为Ka
我有一个Kafka主题,并为其附加了1个消费者(主题只有1个分区)。现在对于超时,我使用默认值(心跳:3秒,会话超时:10秒,轮询超时:5分钟)。 根据留档,轮询超时定义消费者必须在其他代理将该消费者从消费者组中删除之前处理消息。现在假设,消费者只需1分钟即可完成处理消息。 现在我有两个问题
我正在开发一个spring boot kafka消费者应用程序。它将有不同的消费者在不同的主题上工作。使用者的所有信息都来自application.yml文件。 我无法将应用程序属性中的主题列表设置到KafKalistener。 在这两种情况下,我都得到以下错误: java.lang.IllegalArgumentException:无法解析占位符 从应用程序属性获取主题并将其设置在KafkaLi