当前位置: 首页 > 面试题库 >

如何在Kafka中使用多个消费者?

邰棋
2023-03-14
问题内容

我是一名学习Kafka的新学生,在了解多个消费者(到目前为止,文章,文档等对他们没有太大帮助)方面,我遇到了一些基本问题。

我尝试做的一件事是编写我自己的高级Kafka生产者和消费者并同时运行它们,将100条简单消息发布到某个主题,然后让消费者检索它们。我已经成功地做到了这一点,但是当我尝试引入另一个使用者来使用与刚刚发布消息的主题相同的主题时,它没有收到消息。

据我了解,对于每个主题,您可以拥有来自不同消费者组的消费者,并且这些消费者组中的每一个都将获得针对某个主题生成的消息的完整副本。这样对吗?如果没有,对我来说建立多个消费者的正确方法是什么?到目前为止,这是我编写的消费者类:

public class AlternateConsumer extends Thread {
    private final KafkaConsumer<Integer, String> consumer;
    private final String topic;
    private final Boolean isAsync = false;

    public AlternateConsumer(String topic, String consumerGroup) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:9092");
        properties.put("group.id", consumerGroup);
        properties.put("partition.assignment.strategy", "roundrobin");
        properties.put("enable.auto.commit", "true");
        properties.put("auto.commit.interval.ms", "1000");
        properties.put("session.timeout.ms", "30000");
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        consumer = new KafkaConsumer<Integer, String>(properties);
        consumer.subscribe(topic);
        this.topic = topic;
    }


    public void run() {
        while (true) {
            ConsumerRecords<Integer, String> records = consumer.poll(0);
            for (ConsumerRecord<Integer, String> record : records) {
                System.out.println("We received message: " + record.value() + " from topic: " + record.topic());
            }
        }

    }
}

此外,我注意到最初我只是在单个分区上测试主题“ test”的上述消耗。当我将另一个消费者添加到现有的消费者组(例如“
testGroup”)时,这触发了Kafka重新平衡,从而使我的消费延迟显着降低了几秒钟。我以为这是重新平衡的问题,因为我只有一个分区,但是当我创建一个新的主题“
multipartpartitions”(例如6个分区)时,出现了类似的问题,即向同一消费者组添加更多消费者会导致延迟问题。我环顾四周,人们告诉我,我应该使用多线程使用者-
有人可以阐明这一点吗?


问题答案:

我认为您的问题出在auto.offset.reset属性上。当新使用者从分区读取并且没有先前的已提交偏移量时,将使用auto.offset.reset属性来确定起始偏移量。如果将其设置为“最大”(默认),则从最新(最后)消息开始阅读。如果将其设置为“最小”,则会收到第一条可用消息。

因此添加:

properties.put("auto.offset.reset", "smallest");

然后再试一次。

编辑

“最小”和“最大”在不久前已被弃用。您现在应该使用“最早”或“最新”。有任何疑问,请检查文档



 类似资料:
  • 我是一个学习Kafka的新学生,我遇到了一些关于理解多个消费者的基本问题,到目前为止,文章、文档等都没有太大的帮助。 我尝试做的一件事是编写我自己的高级Kafka生产者和消费者,并同时运行他们,发布100个简单的消息到一个主题,并让我的消费者检索他们。我成功地做到了这一点,但是当我试图引入第二个消费者来消费刚刚发布消息的同一主题时,它没有收到任何消息。 我的理解是,对于每个主题,您可以有来自不同消

  • Kafka的doc给出了一种方法,大约用以下描述: 每个线程一个消费者:一个简单的选择是为每个线程提供自己的消费者 我的代码: 但它不起作用,引发了一个异常: JAVAutil。ConcurrentModificationException:KafkaConsumer对于多线程访问不安全 此外,我还阅读了Flink(一个用于分布式流和批处理数据的开源平台)的源代码。Flink使用多线程消费程序与我

  • TL;DR;我试图理解一个被分配了多个分区的单个使用者是如何处理reach分区的消费记录的。 例如: 在移动到下一个分区之前,会完全处理一个分区。 每次处理每个分区中的可用记录块。 从第一个可用分区处理一批N条记录 以循环旋转方式处理来自分区的N条记录 我找到了或分配程序的配置,但这只决定了使用者如何分配分区,而不是它如何从分配给它的分区中使用。 我开始深入研究KafkaConsumer源代码,#

  • 我的问题与单个消费者从多个话题消费有关。假设所有主题都加载了1M个记录,一个使用者必须处理这些记录。它将按照什么顺序从主题中读取(我的意思是首先读取哪个主题/分区,等等) Kafka内部资料的任何链接会有帮助吗?

  • 我有多个制作人,可以向一个Kafka主题发送多种类型的事件。 我有一个消费者,它必须消费所有类型的消息。每种类型的消息都有不同的逻辑。 但在这种情况下,所有消息都指向此方法,不仅是EventOne 如果我实现了两种方法(对于每种类型的消息),那么所有消息都只能使用一种方法。 如果我像这样实现监听器: 然后我得到一个例外:org。springframework。Kafka。KafkaListener

  • 我有一个spring boot项目,我是spring-kafka来连接底层的kafka事件枢纽。 我不得不在同一节消费者课上听2个不同的话题。我有两种方法可以这样做。 一个是要有两个这样的Kafka听众: 另一种方法是在同一个kafkaListener中有两个主题,如下所示 ===================edit===============application.yml中的Kafka属性