当前位置: 首页 > 知识库问答 >
问题:

Kafka 0.10Java消费者没有从主题阅读消息

关志
2023-03-14

我有一个简单的java制作人,如下所示

public class Producer 
{
    private final static String TOPIC = "my-example-topi8";
    private final static String BOOTSTRAP_SERVERS = "localhost:8092";

    public static void main( String[] args ) throws Exception {
        Producer<String, byte[]> producer = createProducer();
        for(int i=0;i<3000;i++) {
            String msg = "Test Message-" + i;
            final ProducerRecord<String, byte[]> record = new ProducerRecord<String, byte[]>(TOPIC, "key" + i, msg.getBytes());
            producer.send(record).get();
            System.out.println("Sent message " + msg);
        }
        producer.close();
    }

    private static Producer<String, byte[]> createProducer() {
        Properties props = new Properties();
        props.put("metadata.broker.list", BOOTSTRAP_SERVERS);
        props.put("bootstrap.servers", BOOTSTRAP_SERVERS);
        props.put("client.id", "AppFromJava");
        props.put("serializer.class", "kafka.serializer.DefaultEncoder");
        props.put("key.serializer.class", "kafka.serializer.StringEncoder");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("compression.codec", "snappy");
        props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        return new KafkaProducer<String, byte[]>(props);
    }
}

我正在尝试读取如下数据

public class Consumer 
{
    private final static String TOPIC = "my-example-topi8";
    private final static String BOOTSTRAP_SERVERS = "localhost:8092";

    public static void main( String[] args ) throws Exception {
        Consumer<String, byte[]> consumer = createConsumer();
        start(consumer);
    }

    static void start(Consumer<String, byte[]> consumer) throws InterruptedException {
        final int giveUp = 10;   
        int noRecordsCount = 0;
        int stopCount = 1000;

        while (true) {
            final ConsumerRecords<String, byte[]> consumerRecords = consumer.poll(1000);
            if (consumerRecords.count()==0) {
                noRecordsCount++;
                if (noRecordsCount > giveUp) break;
                else continue;
            }


            consumerRecords.forEach(record -> {
               // Process the record System.out.printf("\nConsumer Record:(%s, %s, %s)", record.key(), new String(record.value()), record.topic());
            });

            consumer.commitSync();
            break;
        }
        consumer.close();
        System.out.println("DONE");
    }

    private static Consumer<String, byte[]> createConsumer() {
        final Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                                    BOOTSTRAP_SERVERS);
        props.put(ConsumerConfig.GROUP_ID_CONFIG,
                                    "KafkaExampleConsumer");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                ByteArrayDeserializer.class.getName());
        props.put(ConsumerConfig.CLIENT_ID_CONFIG, "1234");
        props.put("enable.auto.commit", "false");

        // Create the consumer using props.
        final Consumer<String, byte[]> consumer = new KafkaConsumer(props);
        consumer.subscribe(Collections.singletonList(TOPIC));
        return consumer;
    }
}

但消费者并没有从Kafka那里读到任何信息。如果我在start()处添加以下内容

consumer.poll(0);
consumer.seekToBeginning(consumer.assignment());

然后消费者开始从题目开始阅读。但是每次消费者重新启动时,它都从我不想要的主题开始读取消息。如果我在启动消费程序时添加了以下配置

props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

然后,它从主题中读取消息,但是如果消费者在处理所有消息之前重新启动,那么它不会读取未处理的消息。

有人可以让我知道出了什么问题,我该如何解决这个问题吗?

Kafka代理和zookeeper使用默认配置运行。

共有1个答案

宗涵蓄
2023-03-14

您对commitSync()的调用是确认最后一次轮询()中批处理中的所有消息,而不是在处理它们时确认每个单独的消息,我认为您正在尝试这样做。

从文档

“上面的示例使用commitSync将所有收到的记录标记为已提交。在某些情况下,您可能希望通过显式指定偏移量来更好地控制已提交的记录。在下面的示例中,我们在处理完每个分区中的记录后提交偏移量。

 try {
     while(running) {
         ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
         for (TopicPartition partition : records.partitions()) {
             List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
             for (ConsumerRecord<String, String> record : partitionRecords) {
                 System.out.println(record.offset() + ": " + record.value());
             }
             long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
             consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
         }
     }
 } finally {
   consumer.close();
 }

注意:提交的偏移量应始终是应用程序将读取的下一条消息的偏移量。因此,当调用commitSync(偏移量)时,您应该在最后一条处理的消息的偏移量上加一个。“

 类似资料:
  • 我试图消费一个Kafka主题从Spring启动应用程序。我使用的是下面提到的版本的Spring云流 Spring boot starter父级:2.5.7 Spring云版本:2020.0.4 下面是代码和配置 application.yml 消息消费者类 下面的消息发布者正在正确地发布消息。发布者是在不同的微服务中编写的。 pom.xml

  • 因为我是新的Kafka,所以我能够从文件中读取记录,并通过生产者将消息发送到Kafka主题,但不能通过消费者消费相同的主题。 注意:您可以从任何文本文件中读取数据,我使用的是Kafka2.11-0.9。0.0版本 这是我的密码: 下面是输出:

  • 我正在用java编写一个简单的Kafka使用者,它被配置为读取多个主题。目前,让我们假设两个主题(topic1和Topic2),并为两个主题设置一个分区。 Kafka用户从topic1和Topic2读取的顺序是什么。如果这两个主题都有,假设已经发布了100条消息。 使用者首先从topic1读取所有消息,然后再从topic2读取? 用户按时间顺序阅读,将来自两个主题的消息混合在一起? 我看了Kafk

  • 我使用confluent .net客户端。订阅者在重启(订阅者服务重启)后始终读取 Kafka 主题的所有消息。如何提交消费者已经实现的偏移并从中读取?也许一些消费者配置可以提供帮助...

  • 我们有一个服务器,负责处理消息的生成和消费。我们有4台笔记本电脑,所有带有confluent的Mac都运行相同的命令行。。。 /kafka avro控制台使用者--从一开始--引导服务器0.0.0.0:9092,0.0.0.0:9092--主题主题名称--属性schema.registry.url=http://0.0.0.0:8081 4台笔记本电脑中有3台没有问题使用这些消息,但是第四台不会。

  • 我对Kafka和Spring Boot是一种新的体验,并试图使我的应用程序从主题的特定分区读取。 单厂代码 这也是我的消费者工厂配置 当我试图运行程序时,它给我一个错误 分区Single上的偏移量提交失败。偏移量308处的Attendance-0:协调器不知道此成员。 和警告 失败:无法完成提交,因为组已重新平衡并将分区分配给另一个成员。这意味着对poll()的后续调用之间的时间比配置的max.p