当前位置: 首页 > 知识库问答 >
问题:

Kafka消费者API工作不正常

楚嘉胜
2023-03-14

我是一个新的Kafka。我开始做Kafka,我面临以下问题,请帮助我解决这一个,提前谢谢。首先,我正在编写生产者API,它工作良好,但在编写消费者API时,消息不会显示。

我的代码是这样的:

import java.util.Arrays;
import java.util.Properties;

import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;


public class ConsumerGroup {
    public static void main(String[] args) throws Exception {

        String topic = "Hello-Kafka";
        String group = "myGroup";
        Properties props = new Properties();
        props.put("bootstrap.servers", "XXX.XX.XX.XX:9092");
        props.put("group.id", group);
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
        try {

            consumer.subscribe(Arrays.asList(topic));
            System.out.println("Subscribed to topic " + topic);


            ConsumerRecords<String, String> records = consumer.poll(100);

            System.out.println("records ::" + records);
            System.out.println(records.toString());
            for (ConsumerRecord<String, String> record : records) {
                System.out.println("Record::" + record.offset());
                System.out.println(record.key());
                System.out.println(record.value());
            }
            consumer.commitSync();

        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            consumer.commitSync();
            consumer.close();
        }
    }
}

已订阅主题Hello-Kafka records::org.apache.kafka.clients.consumer.consumerRecords@76b0bfab org.apache.kafka.clients.consumer.consumerRecords@76b0bfab

这里不打印偏移量,键,值控件不来for(ConsumerRecord:records){那个for循环它自己请帮帮我。

共有1个答案

边意
2023-03-14

您正在尝试打印空记录,因此在代码中只打印records.toString(),这实质上是类的名称。
我对代码做了一些更改,使其正常工作。如果有帮助,请看一下。

public class ConsumerGroup {
    public static void main(String[] args) throws Exception {

        String topic = "Hello-Kafka";
        String group = "myGroup";
        Properties props = new Properties();
        props.put("bootstrap.servers", "xx.xx.xx.xx:9092");
        props.put("group.id", group);
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
        try {

            consumer.subscribe(Arrays.asList(topic));
            System.out.println("Subscribed to topic " + topic);

            while(true){
                ConsumerRecords<String, String> records = consumer.poll(1000);
                if(records.isEmpty()){

                }
                else{
                System.out.println("records ::" + records);
                System.out.println(records.toString());
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println("Record::" + record.offset());
                    System.out.println(record.key());
                    System.out.println(record.value());
                }
                consumer.commitSync();
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            consumer.commitSync();
            consumer.close();
        }
    }
}
 类似资料:
  • 我发现Kafka有一些非常奇怪的地方。 我有一个制片人,有三个经纪人: 然后,我尝试使用新的API运行消费者: 我什么都没有!但是如果我使用旧的API: 我收到我的留言了! 我怎么了? PS:我用的是Kafka10

  • 我最近开始学习Kafka,最后就问了这些问题。 > 消费者和流的区别是什么?对我来说,如果任何工具/应用程序消费来自Kafka的消息,那么它就是Kafka世界中的消费者。 流与Kafka有何不同?为什么需要它,因为我们可以使用消费者API编写自己的消费者应用程序,并根据需要处理它们,或者将它们从消费者应用程序发送到Spark? 我做了谷歌对此,但没有得到任何好的答案。抱歉,如果这个问题太琐碎了。

  • 我刚接触Kafka,很少阅读教程。我无法理解使用者和分区之间的关系。 请回答我下面的问题。 > 消费者是否由ZK分配到单个分区,如果是,如果生产者将消息发送到不同的分区,那么其他分区的消费者将如何使用该消息? 我有一个主题,它有3个分区。我发布消息,它会转到P0。我有5个消费者(不同的消费者群体)。所有消费者都会阅读P0的信息吗?若我增加了许多消费者,他们会从相同的P0中阅读信息吗?如果所有消费者

  • 谁能请解释和指导我链接或资源阅读关于Kafka消费者如何在下面的场景下工作。 > 一个有5个消费者的消费者组和3个分区的主题(Kafka是如何决定的) 一个消费者组有5个消费者,主题有10个分区(kafka如何分担负载) 两个消费者组和两个服务器的kafka集群,其中一个主题被划分在节点1和节点2之间,当来自不同组的消费者订阅到一个分区时,如何避免重复。 上面可能不是配置kafka时的最佳实践,但

  • Flink kafka消费者有两种类型的消费者,例如: 这两个消费者层次结构扩展了相同的类。我想知道维护编号类背后的设计决策是什么?我们什么时候应该使用其中一种? 我注意到带有数字后缀的类有更多的特性(例如ratelimiting)。 https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka

  • 我正在尝试让 kafka 消费者获取在 Java 中生成并发布到主题的消息。我的消费者如下。 consumer.java 当我运行上面的代码时,我在控制台中什么也看不到,屏幕后面的java producer程序正在‘AATest’主题下不断地发布数据。另外,在动物园管理员控制台中,当我尝试运行上面的consumer.java时,我得到了以下行 此外,当我运行指向 AATest 主题的单独控制台使用