当前位置: 首页 > 知识库问答 >
问题:

Kafka——消费者阅读流的一半

骆文彬
2023-03-14

我使用以下代码来读取主题的数据,即“sha-test2”,但它正在读取完全替代的代码行,即 10 行中的 20 行。但是当我运行控制台时,它显示所有 20 行。即.bin/kafka-console-consumer.sh --zookeeper 本地主机:2181 --主题 sha-test2 --从头

我做错了什么?非常感谢您的帮助。

public class KafkaTestConsumer extends  Thread {
    //final static String clientId = "SimpleConsumerDemoClient";
    final static String TOPIC = "sha-test2";
    ConsumerConnector consumerConnector;

    public static void main(String[] argv) throws   
     UnsupportedEncodingException {
        KafkaTestConsumer helloKafkaConsumer = new KafkaTestConsumer();
        helloKafkaConsumer.start();
    }
    public KafkaTestConsumer(){
        Properties properties = new Properties();
        properties.put("zookeeper.connect","172.23.32.35:2181");
        properties.put("group.id","test-group");
        ConsumerConfig consumerConfig = new ConsumerConfig(properties);
        consumerConnector = 
         Consumer.createJavaConsumerConnector(consumerConfig);
    }


    @Override
    public void run() {
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(TOPIC, new Integer(1));
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =  
         consumerConnector.createMessageStreams(topicCountMap);
        KafkaStream<byte[], byte[]> stream =  consumerMap.get(TOPIC).get(0);
        System.out.println("consumerMap : \n " + consumerMap.toString() );
        ConsumerIterator<byte[], byte[]> it = stream.iterator();

       System.out.println("run started");
        while(it.hasNext()){
            System.out.println(new String(it.next().message()));
        }
}

Thank you.
~Shyam

共有2个答案

景宏富
2023-03-14

你的代码看起来非常好。这看起来像一个偏移问题。高级消费者将其偏移量存储在动物园管理员中。

在您的案例中,可能会发生以下情况:-1。您在kafka中放入了10条消息2。您运行了消费者代码,它成功地读取了所有10条消息。此外,消费者在zookeeper中将消耗的偏移量更新为10。3.你阻止了你的消费者。4.你又向kafka发送了10条消息5.你再次启动消费者代码。它只读取最后10条消息,而不是之前推送的10条消息。因为当您重新启动消费者时,它会检查动物园管理员,以找出恢复消费的偏移量。

尝试使用不同的组 ID 重新运行您的使用者,或者尝试在从 zookeeper 中删除偏移量后尝试。它应该可以正常工作。

 properties.put("group.id","test-group420");
骆鸿运
2023-03-14

问题就在这一行:

topicCountMap.put(TOPIC, new Integer(1));

您告诉< code>consumerConnector为您的主题创建一个消费者线程,但是该主题(显然)有两个分区。< code >“test-group”组中消费者线程的数量应该等于或大于分区的数量,否则一些分区将不会被该组读取,这正是您的情况。

请看一下这个例子,其中的线程数是通过命令行参数设置的。

或者,您可以在/brokers/topic/your_topic_name/partitions节点下从Zookeeper中读取存储元数据的分区的确切数量。

 类似资料:
  • 我看到一个问题,我的主题中的所有消息都被我的消费者重新阅读。我只有1个消费者,我在开发/测试时打开/关闭它。我注意到,有时在几天没有运行消费者之后,当我再次打开它时,它会突然重新阅读我的所有消息。 客户端 ID 和组 ID 始终保持不变。我显式调用提交同步,因为我的启用.我确实设置了 auto.offset.reset=最早,但据我所知,只有在服务器上删除了偏移量时,才应该启动。我正在使用 IBM

  • 我对Kafka相对来说是新的,我试图在主题上发送消息后产生消费者。 单个生产者在不同的分区上发送200个msg。 我多次运行消费者脚本。

  • 因为我是新的Kafka,所以我能够从文件中读取记录,并通过生产者将消息发送到Kafka主题,但不能通过消费者消费相同的主题。 注意:您可以从任何文本文件中读取数据,我使用的是Kafka2.11-0.9。0.0版本 这是我的密码: 下面是输出:

  • 我有一个简单的java制作人,如下所示 我正在尝试读取如下数据 但消费者并没有从Kafka那里读到任何信息。如果我在处添加以下内容 然后消费者开始从题目开始阅读。但是每次消费者重新启动时,它都从我不想要的主题开始读取消息。如果我在启动消费程序时添加了以下配置 然后,它从主题中读取消息,但是如果消费者在处理所有消息之前重新启动,那么它不会读取未处理的消息。 有人可以让我知道出了什么问题,我该如何解决

  • 我们使用的是spring集成kafka版本3.1.2。RELEASE and int kafka:消息驱动的通道适配器,用于使用来自远程kafka主题的消息。生产者发送加密消息,我们使用反序列化器解密实际消息。我们可以使用主题中发布的所有消息。我们将自动提交用作false。我们想知道在成功处理消息后如何从我们的服务提交或确认消息。有人能帮助我们如何提交从消息驱动通道读取的消息并提供一些参考实现吗?

  • 我正在使用spring boot构建一个web应用程序,现在我需要接收实时通知。我正计划使用apache kafka作为这方面的消息代理。要求用户具有不同的角色,并且根据角色,他们应该接收其他用户正在执行的操作的通知。 我设置了一个生产者和消费者,作为消费者,我可以接收发布到一个主题的信息,比如说topic1。 我遇到的问题是,我可以让多个用户收听同一个主题,而每个用户都应该得到发布到该主题的消息