当前位置: 首页 > 知识库问答 >
问题:

并非所有的kafka使用者都被分配到分区

狄德泽
2023-03-14

我有10个消费者和10个分区。我取分区数

int partitionCount = getPartitionCount(kafkaUrl);

并且使用相同的group.id创建相同数量的消费者。

    public void listen() {
        try {
            String kafkaUrl = getKafkaUrl();
            int partitionCount = getPartitionCount(kafkaUrl);
            Stream.iterate(0, i -> i + 1)
                    .limit(partitionCount)
                    .forEach(index -> executorService.execute(() ->
                            consumerTask.invokeKafkaConsumerTask(prepareConsumerConfig(index, kafkaUrl), INPUT_TOPIC)));
        } catch (Exception exception) {
            logger.error("Cannot receive event from kafka ", exception);
        }


    public void invokeKafkaConsumerTask(Properties properties, String topicName) {
        try(KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties)) {
            consumer.subscribe(Collections.singletonList(topicName));
            logger.info("[KAFKA] consumer created");
            invokeKafkaConsumer(consumer);
        } catch (IllegalArgumentException exception) {
            logger.error("Cannot create kafka consumer ", exception);
        }
    }

    private void invokeKafkaConsumer(KafkaConsumer<String, String> consumer) {
        try {
            while (true) {
                ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(4));
                if (consumerRecords.count() > 0) {
                    consumeRecords(consumerRecords);
                    consumer.commitSync();
                }
            }
        } catch (Exception e) {
            logger.error("Error while receiving records ", e);
        }
    }
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaUrl);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_CLIENT_ID);
properties.put(ConsumerConfig.CLIENT_ID_CONFIG, CONSUMER_CLIENT_ID + index);
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "300000");
properties.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "10000");
properties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, String.valueOf(Integer.MAX_VALUE));
properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "org.apache.kafka.clients.consumer.RoundRobinAssignor");
TOPIC      PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CLIENT-ID                                                        
topicName  1          89391           89391           0               consumer0
topicName  3          88777           88777           0               consumer1
topicName  5          89280           89280           0               consumer2
topicName  4          88776           88776           0               consumer2
topicName  0          4670991         4670991         0               consumer0
topicName  9          23307           89343           66036           consumer4
topicName  7          89610           89610           0               consumer3
topicName  8          88167           88167           0               consumer4
topicName  2          89138           89138           0               consumer1
topicName  6          88967           88967           0               consumer3

我也发现很少这样的日志->

Setting newly assigned partitions:[empty]

共有1个答案

米承嗣
2023-03-14

[解决方案]有趣的情况我更改了group.id和partition.assignment.strategy,添加了auto.offset.reset=aresty,看起来它可以工作了...

 类似资料:
  • 多台机器生成事件。这些事件被发送到我们的Kafka集群,其中每台机器都有自己的主题(app.machine-events.machine-name)。因为顺序在每台机器的基础上很重要,而分区大小现在不是问题,所以所有主题都由一个分区组成。因此,目前,N个主题也意味着N个分区。 消费/处理应用程序使用了kafka-streams,我们给出了/“machine-event-processor”,它对每

  • 为了使用Kafka通用地发布消息,我使用类名作为主题: 服务器属性(我从默认属性中唯一更改的内容): 注意:我还尝试了以下用户设置:

  • 我有一个带有15个分区的kafka主题[0-14],我正在运行带有5个并行的flink。因此,理想情况下,每个并行flink使用者应该分别使用3个分区。但即使在多次重启之后,很少有Kafka分区不被任何flink工人订阅。 注意:如果我以1个并行度开始作业,则作业工作非常好。 Flink版本:1.3.3

  • 我知道kafka将一个主题的数据安排在许多分区上,一个消费者组中的消费者被分配到不同的分区,从那里他们可以接收数据: 我的问题是: 术语,它们是由主机/IP标识的,还是由客户端连接标识的? 换句话说,如果我启动两个线程或进程,使用相同的消费者组运行相同的Kafka客户端代码,它们被认为是一个消费者还是两个消费者?

  • 我有一个名为“test-topic”的主题,有3个分区。 当我启动一个将group-id设置为“test-group”的使用者(consumer-1)时,它连接并读取主题上的所有分区。到目前为止还好。 当我在同一个组中启动另一个消费者(consumer-2)时,问题就出现了。我希望在两个消费者之间划分分区时能够重新平衡,例如,消费者-1得到分区0和2,消费者-2得到分区1。这种情况不会发生,当然我

  • 我知道每个分区分配给一个Kafka消费者(在消费者组内),但一个Kafka消费者可以同时使用多个分区。如果每个用户都有一个到分区的开放连接,那么我可以想象每个用户都有成千上万个打开的连接。如果这是真的,那么在决定分区数量时,这似乎是需要注意的,不是吗?