当前位置: 首页 > 知识库问答 >
问题:

KAFKA中的几个消费者的循环不起作用

子车才捷
2023-03-14

我有5个独立的docker图像:1个用于kafka经纪人,1个动物园管理员,1个生产者和2个消费者。我通过生产者向主题发布消息。基本上,我希望消息将在循环算法中使用,因此,为此,我使用相同的< code>group.id定义了消费者,并将< code > partition . assignment . strategy 的配置添加为< code > org . Apache . Kafka . clients . consumer . roundrobinsignator ,

但我发现只有一个消费者收到了所有的消息。

我的生产者代码:

import java.util.List;
import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;


public class DiscoveryKafkaProducer{
   Producer<String, String> producer;

   public DiscoveryKafkaProducer(Properties configs) {
   producer = new KafkaProducer<String, String>(configs);   
}

   public void send(String topic, List<String> records) {
   for(String record: records){
       producer.send(new ProducerRecord<String, String>(topic, record));
   }
   producer.flush();
}

我的消费者代码:

public static void main(String[] args) {
    String server = "lshraga-ubuntu-sp-nac:9092";
    Properties consumerConfigs = new Properties();
    consumerConfigs.put("bootstrap.servers", server);
    consumerConfigs.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    consumerConfigs.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    consumerConfigs.put("group.id", "discovery");
    consumerConfigs.put("client.id", "discovery");
    consumerConfigs.put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.RoundRobinAssignor");
    List<String> list = new ArrayList<String>();
    DiscoveryKafkaConsumer consumer1 = new DiscoveryKafkaConsumer(Collections.singletonList(topicName), consumerConfigs);
    
     try {
         while (true) {
            System.out.println("Start to consume");
            consumer1.poll(1000L);
         }
    } catch (InterruptedException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
    
 public class DiscoveryKafkaConsumer {

       Consumer<String, String> consumer;
       Integer id;
       public DiscoveryKafkaConsumer(List<String> topics, Properties configs) {
       consumer = new KafkaConsumer<String, String>(configs);
       consumer.subscribe(topics);
       }


   public DiscoveryKafkaConsumer(int i, List<String> topics, Properties configs) {
       consumer = new KafkaConsumer<String, String>(configs);
       consumer.subscribe(topics);
       this.id = i;
}

   public void poll(long timeout) throws InterruptedException {

       ConsumerRecords<String,String> records = consumer.poll(timeout);
       System.out.println("Hey!Consumer #" + id + "got records:" + records);
       Map<String, List<String>> results = new HashMap<String, List<String>>();
       records.forEach((cr) -> {
    

   System.out.println("cr.topic()=" + cr.topic());
       List<String> list = results.get(cr.topic()); 
       if(list == null) {
           list = new ArrayList<>(); 
           results.put(cr.topic(), list);
           }
            list.add(cr.value());
            System.out.println("list=" + list);
       });

}

我使用Kafka客户端版本0.11.0.0。

我需要添加/配置什么才能以轮循机制方式使用消息?

共有1个答案

申辉
2023-03-14

仍然不可能从Java增加分区数。有KIP-195和相关PR应该包含在下一个版本中:https://cwiki.apache.org/confluence/display/KAFKA/KIP-195:管理员客户端。.

 类似资料:
  • 我正在尝试让 kafka 消费者获取在 Java 中生成并发布到主题的消息。我的消费者如下。 consumer.java 当我运行上面的代码时,我在控制台中什么也看不到,屏幕后面的java producer程序正在‘AATest’主题下不断地发布数据。另外,在动物园管理员控制台中,当我尝试运行上面的consumer.java时,我得到了以下行 此外,当我运行指向 AATest 主题的单独控制台使用

  • 是否有一种方法以编程方式访问和打印使用者滞后偏移,或者说使用者读取的最后一条记录的偏移与某个生产者写入该使用者分区的最后一条记录的偏移之间的位置差。 要知道我的最终目标是将这个值发送到prometheus进行监视,我应该在上面添加哪些语句来得到滞后偏移值?

  • 我有一个Spring靴Kafka消费者 为了避免重新平衡,我尝试在KafkaContainer上调用pause()和resume(),但消费者总是在运行 我错过了什么吗?有人能指导我如何正确地达到要求的行为吗?

  • Flink kafka消费者有两种类型的消费者,例如: 这两个消费者层次结构扩展了相同的类。我想知道维护编号类背后的设计决策是什么?我们什么时候应该使用其中一种? 我注意到带有数字后缀的类有更多的特性(例如ratelimiting)。 https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka

  • 我是Kafka的新手,在尝试一个示例场景时,Kafka生产者以JSON格式向消费者发送用户详细信息。我访问过类似的问题,但我无法得到我需要的答案。 如果我在终端中运行任何一个生产者或消费者,在spring boot中运行另一个生产者或消费者,我不会面临任何问题。错误发生在无限循环中(当生产者和消费者都从不同的spring boot项目启动时): 我在下面提到了消费者配置中的反序列化和受信任包: 我

  • 我刚接触Kafka,很少阅读教程。我无法理解使用者和分区之间的关系。 请回答我下面的问题。 > 消费者是否由ZK分配到单个分区,如果是,如果生产者将消息发送到不同的分区,那么其他分区的消费者将如何使用该消息? 我有一个主题,它有3个分区。我发布消息,它会转到P0。我有5个消费者(不同的消费者群体)。所有消费者都会阅读P0的信息吗?若我增加了许多消费者,他们会从相同的P0中阅读信息吗?如果所有消费者