我从Apache Kakfa开始,用Java开发一个简单的生产者、消费者应用程序。我正在使用kafka客户端,并在Mac上运行它。
我创建了一个名为“replicated\u topic\u partitioned”的主题,其中包含3个分区,复制因子为3。
我在2181港启动了动物园管理员。我在端口9092、9093和9094上分别启动了三个id为1、2和3的代理。
下面是descripe命令的输出
kafka_2.12-2.3.0/bin/kafka-topics.sh --describe --topic replicated_topic_partitioned --bootstrap-server localhost:9092
Topic:replicated_topic_partitioned PartitionCount:3 ReplicationFactor:3 Configs:segment.bytes=1073741824
Topic: replicated_topic_partitioned Partition: 0 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2
Topic: replicated_topic_partitioned Partition: 1 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: replicated_topic_partitioned Partition: 2 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1
我编写了一个简单的生产者和一个消费者代码。生产者成功运行并发布了消息。但是当我启动消费者时,轮询调用只是无限期等待。在调试时,我发现它一直在消费者网络客户端的waitMetadataUpdate方法处循环。
这是生产者和消费者的代码
Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> myProducer = new KafkaProducer<>(properties);
DateFormat dtFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss:SSS");
String topic = "replicated_topic_partitioned";
int numberOfRecords = 10;
try {
for (int i = 0; i < numberOfRecords; i++) {
String message = String.format("Message: %s sent at %s", Integer.toString(i), dtFormat.format(new Date()));
System.out.println("Sending " + message);
myProducer.send(new ProducerRecord<String, String>(topic, message));
}
} catch (Exception e) {
e.printStackTrace();
} finally {
myProducer.close();
}
Consumer.java
Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("group.id", UUID.randomUUID().toString());
properties.put("auto.offset.reset", "earliest");
KafkaConsumer<String, String> myConsumer = new KafkaConsumer<>(properties);
String topic = "replicated_topic_partitioned";
myConsumer.subscribe(Collections.singletonList(topic));
try {
while (true){
ConsumerRecords<String, String> records = myConsumer.poll(1000);
printRecords(records);
}
} finally {
myConsumer.close();
}
从server.properties
添加一些键字段
broker.id=1
host.name=localhost
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs-1
num.partitions=1
num.recovery.threads.per.data.dir=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
服务器。其他两个经纪人的财产是上述经纪人财产的复制品。id、端口和日志。目录已更改。
这对我不起作用:Kafka0.9.0.1Java消费者陷入等待MetadataUpdate()
但是,如果我从传递分区的命令行启动消费者,它会成功读取该分区的消息。但是当只指定了一个主题时,它不会收到任何消息。
作品:
kafka_2.12-2.3.0/bin/kafka-console-consumer.sh --topic replicated_topic_partitioned --bootstrap-server localhost:9092
--from-beginning --partition 1
不工作:
kafka_2.12-2.3.0/bin/kafka-console-consumer.sh --topic replicated_topic_partitioned --bootstrap-server localhost:9092
--from-beginning
注意:对于复制因子等于1的主题,上述消费者非常有效。
问题:
>
为什么Java Producer没有为复制因子不止一个的主题读取任何消息(即使将其分配给分区时也是如此)(如myConsumer.assign(Collections.singletonList(新的TopicPartition(主题,2))之类)?
为什么控制台使用者仅在传递分区时读取消息(同样适用于复制因子为1的主题)
免责声明:这不是答案。
Java使用者现在正在按预期工作。我没有对代码或配置做任何更改。我唯一做的就是重启我的Mac电脑。这导致Kafka日志文件夹(我想还有zookeeper文件夹)被删除。
我重新创建了主题(使用相同的命令-3个分区,复制因子为3)。然后用相同的配置重新启动代理-没有advertised.host.name
或advertised.port
配置。
因此,kafka日志和主题的重新创建修复了之前导致问题的东西。
我唯一的怀疑是一个未正确终止的消费者。我运行消费者代码时,最初没有在finally块中对消费者进行关闭调用。我也有同样的小组。id 。也许,所有3个分区都分配给了未正确终止或关闭的使用者。这只是猜测。。
但甚至打电话给我的消费者。position(new TopicPartition(topic,2))在我之前将使用者分配到分区时没有返回响应。它在相同的waitingmetadataupdate方法中循环。
所以,您发送了10条记录,但所有10条记录都有相同的密钥:
for (int i = 0; i < numberOfRecords; i++) {
String message = String.format("Message: %s sent at %s", Integer.toString(i), dtFormat.format(new Date()));
System.out.println("Sending " + message);
myProducer.send(new ProducerRecord<String, String>(topic, message)); <--- KEY=topic
}
除非另有说明(通过直接在ProducerRecord上设置分区),记录传递到的分区由以下内容确定:
分区=MURRU2(序列化(键))%n分区
所以相同的键意味着相同的分区。
您是否尝试过在分区0和2上搜索10条记录?
如果您想在分区之间更好地“传播”记录,请使用空键(您会得到循环)或可变键。
本文向大家介绍解释术语“主题复制因子”。相关面试题,主要包含被问及解释术语“主题复制因子”。时的应答技巧和注意事项,需要的朋友参考一下 答:在设计Kafka系统时,考虑主题复制是非常重要的。
按照此处提到的解决方案,kafka-mire-maker-fasting-to-复制-消费者-偏移-主题。我能够跨DC1(Live Kafka集群)和DC2(Backup Kafka集群)集群启动镜像制作器而没有任何错误。 看起来它还能够从DC1集群跨DC2集群同步主题。 问题 如果我关闭 DC1 的使用者并将同一使用者(同一group_id)指向 DC2,即使镜像制造商能够同步本主题和分区的偏移
我们有一个服务器,负责处理消息的生成和消费。我们有4台笔记本电脑,所有带有confluent的Mac都运行相同的命令行。。。 /kafka avro控制台使用者--从一开始--引导服务器0.0.0.0:9092,0.0.0.0:9092--主题主题名称--属性schema.registry.url=http://0.0.0.0:8081 4台笔记本电脑中有3台没有问题使用这些消息,但是第四台不会。
我正在使用以下在docker上运行kafka、zookeeper和kafdrop: 我有一个具有以下配置的Spring Boot Producer应用程序-: 在我的中,我有以下内容: 这是一个单独的应用程序,我在我的服务中这样称呼Kafka制作人: 在一个完全不同的spring引导应用程序中,我有一个像这样的使用者: 我可以看到消费者正在连接到代理,但是有消息的日志。下面是我能看到的完整日志:
我已经编写了一个streams应用程序,用于在由5个代理和10个分区组成的集群上与主题对话。我在这里尝试了多种组合,比如10个应用程序实例(在10台不同的机器上),每个实例有1个流线程,5个实例每个实例有2个线程。但由于某种原因,当我签入kafka manager时,分区和流线程之间的1:1映射没有发生。一些线程正在拾取2个分区,而一些线程没有拾取任何分区。你能帮我做同样的事吗??所有线程都是同一