当前位置: 首页 > 知识库问答 >
问题:

Kafka:同一消费者组中的多个实例监听同一分区内的主题

颜思淼
2023-03-14

我有两个kafka consumer实例,配置了相同的消费者组,并监听相同主题中的分区0。问题是我发消息到题目的时候。消息由两个实例使用,这两个实例应该不会发生,因为它们在同一个组中。我使用Spring Boot配置类来配置它们。

以下是配置:

@Bean
ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);
    return factory;
}

@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
    return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}

@Bean
public Map<String, Object> consumerConfigs() {

    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
    props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer);
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    return props;
}

以下是听众:

@KafkaListener(topicPartitions = {@TopicPartition(topic = "${kafka.topic.orders}", partitions = "0")})
public void consume(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) {

    log.info("message received at " + orderTopic + "at partition 0");
    processRecord(record, acknowledgment);
}

共有1个答案

汲雅珺
2023-03-14

Kafka不是这样工作的;当你手动分配这样的分区(@Topic分区)时,你是在明确告诉Kafka你想从那个分区接收消息——消费者将分区分配给自己。

换句话说,通过手动分配,您将负责分配分区。

您需要使用组管理,让Kafka将主题分配给实例。

使用< code >主题= " ... "Kafka会完成这个任务。如果没有足够的主题,实例就会闲置。您至少需要与实例一样多的分区,才能让所有实例都参与进来。

 类似资料:
  • 由于消息需求的排序,我们有一个主题和一个分区。我们有两个消费者运行在不同的服务器上,具有相同的配置集,即groupId、consumerId和consumerGroup。即 1主题- 当我们部署消费者时,相同的代码会部署在两台服务器上。当消息到来时,我们会注意到两个消费者都在消费消息,而不是只有一个处理。让消费者在两台独立的服务器上运行的原因是,如果一台服务器崩溃,至少其他服务器可以继续处理消息。

  • 我有4个分区和4个消费者(例如A、B、C、D)。如何使用使用者组配置哪个使用者将从哪个分区读取数据。我用的是Kafka的春靴。

  • 我有一个将消息写入主题/分区的生产者。为了保持顺序,我希望使用单个分区,我希望12个使用者读取来自这个分区的所有消息(没有使用者组,所有消息都应该发送给所有使用者)。这是可以实现的吗?我读过一些论坛,每个分区只有一个用户可以阅读。

  • 我有一个spring boot项目,我是spring-kafka来连接底层的kafka事件枢纽。 我不得不在同一节消费者课上听2个不同的话题。我有两种方法可以这样做。 一个是要有两个这样的Kafka听众: 另一种方法是在同一个kafkaListener中有两个主题,如下所示 ===================edit===============application.yml中的Kafka属性

  • 默认情况下,多个Kafka消费者可以从同一主题的同一分区中阅读吗?默认情况下,我的意思是,由于group.id不是强制性的,我想知道如果我在没有指定任何group.id的情况下生成多个kafka消费者,并给他们相同的主题和分区名,他们能够从相同的分区读取吗?我理解,如果我为每个Kafka消费者指定不同的组名,那么所有消费者都可以从同一个分区读取。

  • 我有1个消费者群体和5个消费者。也有5个分区,因此每个消费者得到1个分区。 CLI还显示 bin/Kafka-console-consumer . sh-bootstrap-server localhost:9092-Topic Topic-1-from-beginning-partition { n }正确显示每个分区的不同消息。 然而,我经常看到两个或两个以上的消费者在处理同一条信息,而且对于