当前位置: 首页 > 知识库问答 >
问题:

Spring-Kafka并发性

澹台权
2023-03-14

我正在用Spring-Kafka写我的第一个Kafka消费者。看了一下framework提供的不同选项,对相同的选项几乎没有疑问。能否有人请澄清以下,如果你已经在它工作。

问题1:根据Spring-Kafka文档,有两种实现Kafka-Consumer的方法;“您可以通过配置MessageListenerContainer并提供消息侦听器或使用@Kafkalistener注释来接收消息”。有人能告诉我什么时候应该选择一个选项而不是另一个选项吗?

问题2:我选择了KafkaListener方法来写我的申请。为此,我需要初始化一个容器工厂实例,在容器工厂内部有控制并发的选项。我只想检查一下我对并发性的理解是否正确。

批处理确认使用者:

    public void onMessage(List<ConsumerRecord<String, String>> records, Acknowledgment acknowledgment,
          Consumer<?, ?> consumer) {
      for (ConsumerRecord<String, String> record : records) {
          System.out.println("Record : " + record.value());
          // Process the message here..
          listener.addOffset(record.topic(), record.partition(), record.offset());
       }
       acknowledgment.acknowledge();
    }

初始化集装箱工厂:

@Bean
public ConsumerFactory<String, String> consumerFactory() {
    return new DefaultKafkaConsumerFactory<String, String>(consumerConfigs());
}

@Bean
public Map<String, Object> consumerConfigs() {
    Map<String, Object> configs = new HashMap<String, Object>();
    configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServer);
    configs.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enablAutoCommit);
    configs.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPolInterval);
    configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
    configs.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
    configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    return configs;
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
    // Not sure about the impact of this property, so going with 1
    factory.setConcurrency(2);
    factory.setBatchListener(true);
    factory.getContainerProperties().setAckMode(AckMode.MANUAL);
    factory.getContainerProperties().setConsumerRebalanceListener(RebalanceListener.getInstance());
    factory.setConsumerFactory(consumerFactory());
    factory.getContainerProperties().setMessageListener(new BatchAckConsumer());
    return factory;
}

共有1个答案

申宜
2023-03-14

>

  • @kafkalistener是一个消息驱动的“POJO”,它添加了负载转换、参数匹配等内容。如果实现MessageListener,则只能从Kafka获得原始的ConsumerRecord。参见@Kafkalistener注释。

    是的,并发表示线程数;每个线程创建一个使用者;它们并行运行;在您的示例中,每个分区将获得2个分区。

    如果我们是平行消费,我们是否应该考虑任何事情。

  •  类似资料:
    • 使用Spring-Cloud-Stream的kafka绑定器,如何配置并发消息消费者(在单个消费者jvm中)?如果我没有理解错的话,在使用kafka时并发使用消息需要分区,但是s-c-s文档指出,要使用分区,您需要通过partitionKeyExpression或PartitionKeyExtractorClass在生成器中指定分区选择。Kafka博士提到循环分区。 s-c-s文档根本没有提到sp

    • 我正在使用@Kafkalistener注释编写一个kafka使用者,我知道有一种方法可以使用ConcurrentKafkaListenerContainerFactory中的方法来增加来自不同分区的并发kafka使用者的数量 用于setconcurrency的Javadoc如下所示:- 运行的并发KafkaMessageListenerContainer的最大数目。来自同一分区内的消息将按顺序处理

    • 我正在编写一个基于Java的Kafka消费者应用程序。我正在为我的应用程序使用kafka-clients、Spring Kafka和Spring boot。虽然Spring boot让我可以轻松地编写Kafka消费者(无需真正编写ConcurrentKafkaListenerContainerFactory、ConsumerFactory等),但我希望能够为这些消费者定义/定制一些属性。然而,我无

    • 我已经使用Spring Kafka创建了一个Kafka消费者,并将其部署在云铸造中。该主题有10个分区。我计划将应用程序扩展到10个实例,以便每个实例可以使用来自一个分区的消息。Spring Kafka支持并发消息侦听器容器,我猜它支持从每个分区创建多个线程来使用。例如,如果我有5个消费者实例,每个消费者实例可能有2个线程从分区消耗。因为我计划为每个分区创建一个应用实例,所以使用并发消费者有什么好

    • 使用Spring Cloud Stream版本Chelsea. SR2,RabbitMQ作为消息代理。要拥有多个消费者,我们使用属性并发(入站消费者的并发)。 如果我们将并发设置为50。它从1开始,慢慢地增加消费者计数。有没有任何可能的解决方案可以使用更高的数字而不是一个来启动初始消费者计数,以提高消费者性能。