当前位置: 首页 > 知识库问答 >
问题:

Spring Kafka单个主题的多个消费者消耗不同的消息

通正平
2023-03-14

在我的Spring Boot Kafka应用程序中,我有以下使用者配置:

@Bean
public ConsumerFactory<String, Post> postConsumerFactory(KafkaProperties kafkaProperties) {
    return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties(), new StringDeserializer(), new JsonDeserializer<>(Post.class));
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, Post> postKafkaListenerContainerFactory(KafkaProperties kafkaProperties) {

    kafkaProperties.getProperties().put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, kafkaConsumerMaxPollIntervalMs);
    kafkaProperties.getProperties().put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, kafkaConsumerMaxPollRecords);

    ConcurrentKafkaListenerContainerFactory<String, Post> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
    factory.setConsumerFactory(postConsumerFactory(kafkaProperties));

    return factory;
}

消费者:

@KafkaListener(topics = "${kafka.topic.post.send}", containerFactory = "postKafkaListenerContainerFactory")
public void sendPost(ConsumerRecord<String, Post> consumerRecord, Acknowledgment ack) {

    // do some logic

    ack.acknowledge();
}

如果我理解正确的话,现在我有一个消费者的实例。我想增加post消费者的数量,假设有5个消费者将消费来自${kafka.topic.post.send}的不同(不同)消息,以加快消息消费。

它是否像添加工厂一样简单。setConcurrency(5) 至我的PostKafkAlisterContainerFactory(),例如:

@Bean
public ConcurrentKafkaListenerContainerFactory<String, Post> postKafkaListenerContainerFactory(KafkaProperties kafkaProperties) {

    kafkaProperties.getProperties().put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, kafkaConsumerMaxPollIntervalMs);
    kafkaProperties.getProperties().put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, kafkaConsumerMaxPollRecords);

    ConcurrentKafkaListenerContainerFactory<String, Post> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
    factory.setConsumerFactory(postConsumerFactory(kafkaProperties));
    factory.setConcurrency(5);

    return factory;
}

或者我需要做一些额外的工作来实现它吗?

共有1个答案

夹谷山
2023-03-14

阿帕奇·Kafka不是这样工作的。一个想法是,在单个线程的同一个分区中始终存在进程记录。那家工厂。setConcurrency(5) 肯定是关于一个主题中有多少个分区。所以,如果你只有一个,这个属性不会带来任何价值。如果主题中有10个分区,那么Spring Kafka会生成5个线程,每个线程将处理2个分区。

我想说的是,这在参考手册中非常清楚:

假设提供了6个TopicPartition,并发性为3;每个容器将获得2个分区。对于5个TopicPartition,2个容器将获得2个分区,第三个容器将获得1个分区。如果并发性大于TopicPartitions的数量,那么并发性将被下调,这样每个容器将获得一个分区。

因此,如果您想拥有所描述的这种并发性,您确实需要在主题中创建5个分区。只有在这之后,您才能并行处理同一主题中的记录。

 类似资料:
  • 我的问题与单个消费者从多个话题消费有关。假设所有主题都加载了1M个记录,一个使用者必须处理这些记录。它将按照什么顺序从主题中读取(我的意思是首先读取哪个主题/分区,等等) Kafka内部资料的任何链接会有帮助吗?

  • 我怎样才能暗示SpringKafka把每一个话题传播给一个不同的消费者呢? 干杯

  • 我们正在开发一个应用程序,我们想听Kafka中不止一个主题。所有主题都有一个分区。所有主题名称都有一个公共的前缀,例如“test-x”、“test-y”,所以我们可以对它使用spring。 我们希望编写一个java spring使用者,它使用模式监听所有主题。我们的想法是,我们可以运行同一个消费者(属于同一个组)的多个实例,Kafka将为不同的消费者分发来自不同主题的消息。 然而,这似乎并不奏效。

  • 我有一个主题列表(目前是10个),其大小可以在未来增加。我知道我们可以产生多个线程(每个主题)来消耗每个主题,但在我的例子中,如果主题的数量增加,那么消耗主题的线程数量也会增加,这是我不希望的,因为主题不会太频繁地获取数据,所以线程将是理想的。 有没有办法让单个消费者从所有话题中消费?如果是的话,我们怎样才能做到呢?另外,Kafka将如何维护抵消?请建议答案。

  • 问题内容: 我有一个主题列表(目前为10个),其规模将来可能会增加。我知道我们可以在每个主题中产生多个线程(每个主题)使用,但是就我而言,如果主题数量增加,那么从主题中使用的线程数量就会增加,这是我不希望的,因为主题不是太频繁地获取数据,因此线程将处于理想状态。 有没有办法让一个消费者从所有主题中消费?如果是,那我们如何实现呢?另外,Kafka将如何维护偏移量?请提出答案。 问题答案: 我们可以使

  • 我是AMQP的新手,正在尝试为RabbitMQ系统制定一个通知架构。 我想要一个主题交换(通知交换,比方说),特别是因为我想灵活地使用主题交换附带的路由密钥和队列,以及将来扩展该主题的更多选项。不过,我可能是错的,因为... 我还想让两个或更多的消费者使用每个通知。作为基线,我希望发布的每个通知都在数据库中结束。此外,我希望每个通知都可以由客户端应用程序使用(例如,web应用程序使用并进一步通过套