当前位置: 首页 > 知识库问答 >
问题:

Kafka消费者与JAVA

太叔高义
2023-03-14

我有一个Kafka代理,有多个主题,每个主题都有一个分区。

我有一个消费者,它可以很好地使用主题中的消息

我的问题是,我需要通过增加分区的数量来提高消息队列的吞吐量,比如说,我在一个主题上有四个分区,有没有一种方法可以让我编写四个消费者,每个消费者都指向该主题上的各个分区???

import java.util.*;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

public class KafkaConsumer {
   private ConsumerConnector consumerConnector = null;
   private final String topic = "mytopic";

   public void initialize() {
         Properties props = new Properties();
         props.put("zookeeper.connect", "localhost:2181");
         props.put("group.id", "testgroup");
         props.put("zookeeper.session.timeout.ms", "400");
         props.put("zookeeper.sync.time.ms", "300");
         props.put("auto.commit.interval.ms", "1000");
         ConsumerConfig conConfig = new ConsumerConfig(props);
         consumerConnector = Consumer.createJavaConsumerConnector(conConfig);
   }

   public void consume() {
         //Key = topic name, Value = No. of threads for topic
         Map<String, Integer> topicCount = new HashMap<String, Integer>();       
         topicCount.put(topic, new Integer(1));

         //ConsumerConnector creates the message stream for each topic
         Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams =
               consumerConnector.createMessageStreams(topicCount);         

         // Get Kafka stream for topic 'mytopic'
         List<KafkaStream<byte[], byte[]>> kStreamList =
                                              consumerStreams.get(topic);
         // Iterate stream using ConsumerIterator
         for (final KafkaStream<byte[], byte[]> kStreams : kStreamList) {
                ConsumerIterator<byte[], byte[]> consumerIte = kStreams.iterator();

                while (consumerIte.hasNext())
                       System.out.println("Message consumed from topic
                                     [" + topic + "] : "       +
                                       new String(consumerIte.next().message()));              
         }
         //Shutdown the consumer connector
         if (consumerConnector != null)   consumerConnector.shutdown();          
   }

   public static void main(String[] args) throws InterruptedException {
         KafkaConsumer kafkaConsumer = new KafkaConsumer();
         // Configure Kafka consumer
         kafkaConsumer.initialize();
         // Start consumption
         kafkaConsumer.consume();
   }

}

共有1个答案

邢炯
2023-03-14

基本上,您需要做的就是启动多个消费者,这些消费者都属于同一个消费者组。如果您使用的是kafka 0.9或更高版本的新使用者,或者如果您使用的是高级使用者,kafka将负责划分分区,确保每个分区由一个使用者读取。如果分区多于使用者,则某些使用者将从多个分区接收消息,但同一使用者组中的多个使用者不会读取任何分区,以确保消息不重复。因此,您永远不希望消费者多于分区,因为有些消费者将处于空闲状态。您还可以使用简单使用者微调哪个使用者读取每个分区https://cwiki.apache.org/confluence/display/KAFKA/0.8.0SimpleConsumer示例

您似乎正在使用Kafka0.8或更早版本的旧消费者。你可能想考虑转换到新的消费者。http://kafka.apache.org/documentation.html#intro_consumers

下面是另一篇很好的文章,详细介绍了如何使用新消费者编写消费者:http://www.confluent.io/blog/tutorial-getting-started-with-the-new-apache-kafka-0-9-consumer-client/

 类似资料:
  • 是否有一种方法以编程方式访问和打印使用者滞后偏移,或者说使用者读取的最后一条记录的偏移与某个生产者写入该使用者分区的最后一条记录的偏移之间的位置差。 要知道我的最终目标是将这个值发送到prometheus进行监视,我应该在上面添加哪些语句来得到滞后偏移值?

  • 我刚接触Kafka,很少阅读教程。我无法理解使用者和分区之间的关系。 请回答我下面的问题。 > 消费者是否由ZK分配到单个分区,如果是,如果生产者将消息发送到不同的分区,那么其他分区的消费者将如何使用该消息? 我有一个主题,它有3个分区。我发布消息,它会转到P0。我有5个消费者(不同的消费者群体)。所有消费者都会阅读P0的信息吗?若我增加了许多消费者,他们会从相同的P0中阅读信息吗?如果所有消费者

  • Flink kafka消费者有两种类型的消费者,例如: 这两个消费者层次结构扩展了相同的类。我想知道维护编号类背后的设计决策是什么?我们什么时候应该使用其中一种? 我注意到带有数字后缀的类有更多的特性(例如ratelimiting)。 https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka

  • 我最近开始学习Kafka,最后就问了这些问题。 > 消费者和流的区别是什么?对我来说,如果任何工具/应用程序消费来自Kafka的消息,那么它就是Kafka世界中的消费者。 流与Kafka有何不同?为什么需要它,因为我们可以使用消费者API编写自己的消费者应用程序,并根据需要处理它们,或者将它们从消费者应用程序发送到Spark? 我做了谷歌对此,但没有得到任何好的答案。抱歉,如果这个问题太琐碎了。

  • 向Kafka推送100,000条消息 在使用者使用所有100,000条消息之前,使用Ctrl-C关闭zookeeper和kafka服务(这是通过在consumer方法中使用来模拟的)。 发现 在zookeeper和kafka服务被关闭后,消费者继续在控制台上写消息。 问题 我如何使消费者从上次消费的消息的索引+1继续。 向Kafka推送100,000条消息 在使用者使用所有100,000条消息之前

  • 我有一个简单的Kafka设置。生成器正在以较高的速率向单个分区生成具有单个主题的消息。单个使用者正在使用来自此分区的消息。在此过程中,使用者可能会多次暂停处理消息。停顿可以持续几分钟。生产者停止产生消息后,所有排队的消息都将由使用者处理。生产者产生的消息似乎不会立即被消费者看到。我使用的是Kafka0.10.1.0。这里会发生什么?下面是使用消息的代码部分: 代理上的所有配置都保留为kafka默认