当前位置: 首页 > 知识库问答 >
问题:

Apache Kafka Java使用者未收到复制因子大于1的主题的消息

景稳
2023-03-14

我从Apache Kakfa开始,用Java开发一个简单的生产者、消费者应用程序。我正在使用kafka客户端,并在Mac上运行它。

我创建了一个名为“replicated\u topic\u partitioned”的主题,其中包含3个分区,复制因子为3。

我在2181港启动了动物园管理员。我在端口9092、9093和9094上分别启动了三个id为1、2和3的代理。

下面是descripe命令的输出

kafka_2.12-2.3.0/bin/kafka-topics.sh --describe --topic replicated_topic_partitioned --bootstrap-server localhost:9092    
Topic:replicated_topic_partitioned    PartitionCount:3    ReplicationFactor:3    Configs:segment.bytes=1073741824
     Topic: replicated_topic_partitioned    Partition: 0    Leader: 3    Replicas: 3,1,2    Isr: 3,1,2
     Topic: replicated_topic_partitioned    Partition: 1    Leader: 1    Replicas: 1,2,3    Isr: 1,2,3
     Topic: replicated_topic_partitioned    Partition: 2    Leader: 2    Replicas: 2,3,1    Isr: 2,3,1

我编写了一个简单的生产者和一个消费者代码。生产者成功运行并发布了消息。但是当我启动消费者时,轮询调用只是无限期等待。在调试时,我发现它一直在消费者网络客户端的waitMetadataUpdate方法处循环。

这是生产者和消费者的代码

Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

KafkaProducer<String, String> myProducer = new KafkaProducer<>(properties);
DateFormat dtFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss:SSS");
String topic = "replicated_topic_partitioned";

int numberOfRecords = 10;
try {
    for (int i = 0; i < numberOfRecords; i++) {
       String message = String.format("Message: %s  sent at %s", Integer.toString(i), dtFormat.format(new Date()));
       System.out.println("Sending " + message);
       myProducer.send(new ProducerRecord<String, String>(topic, message));

    }
} catch (Exception e) {
    e.printStackTrace();
} finally {
    myProducer.close();
}

Consumer.java

Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");  
properties.put("group.id", UUID.randomUUID().toString());
properties.put("auto.offset.reset", "earliest");

KafkaConsumer<String, String> myConsumer = new KafkaConsumer<>(properties);

String topic = "replicated_topic_partitioned";
myConsumer.subscribe(Collections.singletonList(topic));

try {
    while (true){
        ConsumerRecords<String, String> records = myConsumer.poll(1000);
         printRecords(records);
    }
 } finally {
     myConsumer.close();
 }

server.properties添加一些键字段

broker.id=1 
host.name=localhost
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600

log.dirs=/tmp/kafka-logs-1
num.partitions=1
num.recovery.threads.per.data.dir=1

transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

zookeeper.connection.timeout.ms=6000

group.initial.rebalance.delay.ms=0

服务器。其他两个经纪人的财产是上述经纪人财产的复制品。id、端口和日志。目录已更改。

这对我不起作用:Kafka0.9.0.1Java消费者陷入等待MetadataUpdate()

但是,如果我从传递分区的命令行启动消费者,它会成功读取该分区的消息。但是当只指定了一个主题时,它不会收到任何消息。

作品:

kafka_2.12-2.3.0/bin/kafka-console-consumer.sh --topic replicated_topic_partitioned --bootstrap-server localhost:9092 
     --from-beginning --partition 1

不工作:

kafka_2.12-2.3.0/bin/kafka-console-consumer.sh --topic replicated_topic_partitioned --bootstrap-server localhost:9092 
    --from-beginning 

注意:对于复制因子等于1的主题,上述消费者非常有效。

问题:

>

  • 为什么Java Producer没有为复制因子不止一个的主题读取任何消息(即使将其分配给分区时也是如此)(如myConsumer.assign(Collections.singletonList(新的TopicPartition(主题,2))之类)?

    为什么控制台使用者仅在传递分区时读取消息(同样适用于复制因子为1的主题)

  • 共有2个答案

    徐奇
    2023-03-14

    免责声明:这不是答案。

    Java使用者现在正在按预期工作。我没有对代码或配置做任何更改。我唯一做的就是重启我的Mac电脑。这导致Kafka日志文件夹(我想还有zookeeper文件夹)被删除。

    我重新创建了主题(使用相同的命令-3个分区,复制因子为3)。然后用相同的配置重新启动代理-没有advertised.host.nameadvertised.port配置。

    因此,kafka日志和主题的重新创建修复了之前导致问题的东西。

    我唯一的怀疑是一个未正确终止的消费者。我运行消费者代码时,最初没有在finally块中对消费者进行关闭调用。我也有同样的小组。id 。也许,所有3个分区都分配给了未正确终止或关闭的使用者。这只是猜测。。

    但甚至打电话给我的消费者。position(new TopicPartition(topic,2))在我之前将使用者分配到分区时没有返回响应。它在相同的waitingmetadataupdate方法中循环

    景星光
    2023-03-14

    所以,您发送了10条记录,但所有10条记录都有相同的密钥:

    for (int i = 0; i < numberOfRecords; i++) {
       String message = String.format("Message: %s  sent at %s", Integer.toString(i), dtFormat.format(new Date()));
       System.out.println("Sending " + message);
       myProducer.send(new ProducerRecord<String, String>(topic, message)); <--- KEY=topic
    }
    

    除非另有说明(通过直接在ProducerRecord上设置分区),记录传递到的分区由以下内容确定:

    分区=MURRU2(序列化(键))%n分区

    所以相同的键意味着相同的分区。

    您是否尝试过在分区0和2上搜索10条记录?

    如果您想在分区之间更好地“传播”记录,请使用空键(您会得到循环)或可变键。

     类似资料:
    • 本文向大家介绍解释术语“主题复制因子”。相关面试题,主要包含被问及解释术语“主题复制因子”。时的应答技巧和注意事项,需要的朋友参考一下 答:在设计Kafka系统时,考虑主题复制是非常重要的。

    • 按照此处提到的解决方案,kafka-mire-maker-fasting-to-复制-消费者-偏移-主题。我能够跨DC1(Live Kafka集群)和DC2(Backup Kafka集群)集群启动镜像制作器而没有任何错误。 看起来它还能够从DC1集群跨DC2集群同步主题。 问题 如果我关闭 DC1 的使用者并将同一使用者(同一group_id)指向 DC2,即使镜像制造商能够同步本主题和分区的偏移

    • 我们有一个服务器,负责处理消息的生成和消费。我们有4台笔记本电脑,所有带有confluent的Mac都运行相同的命令行。。。 /kafka avro控制台使用者--从一开始--引导服务器0.0.0.0:9092,0.0.0.0:9092--主题主题名称--属性schema.registry.url=http://0.0.0.0:8081 4台笔记本电脑中有3台没有问题使用这些消息,但是第四台不会。

    • 我正在使用以下在docker上运行kafka、zookeeper和kafdrop: 我有一个具有以下配置的Spring Boot Producer应用程序-: 在我的中,我有以下内容: 这是一个单独的应用程序,我在我的服务中这样称呼Kafka制作人: 在一个完全不同的spring引导应用程序中,我有一个像这样的使用者: 我可以看到消费者正在连接到代理,但是有消息的日志。下面是我能看到的完整日志:

    • 我已经编写了一个streams应用程序,用于在由5个代理和10个分区组成的集群上与主题对话。我在这里尝试了多种组合,比如10个应用程序实例(在10台不同的机器上),每个实例有1个流线程,5个实例每个实例有2个线程。但由于某种原因,当我签入kafka manager时,分区和流线程之间的1:1映射没有发生。一些线程正在拾取2个分区,而一些线程没有拾取任何分区。你能帮我做同样的事吗??所有线程都是同一