当前位置: 首页 > 知识库问答 >
问题:

具有相同组id的多个kafka消费者

司寇高洁
2023-03-14

我对Kafka是陌生的。我用spring boot创建了一个kafka消费者(spring-kafka dependency)。在我的应用程序中,我使用了consumerFactory和producerfactory beans进行配置。所以在我的应用html" target="_blank">程序中,我创建了如下的kafka消费者。

@RetryableTopic(
        attempts = "3",
        backoff = @Backoff(delay = 1000, multiplier = 2.0),
        autoCreateTopics = "false")
  @KafkaListener(topics = "myTopic", groupId = "myGroupId")
  public void consume(@Payload(required = false) String message) {

       processMessage(message);
  
}

我的配置如下

@Bean
  public ConsumerFactory<String, Object> consumerFactory() {
    Map<String, Object> config = new HashMap<>();
    config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, env.getProperty("kafka.consumer.bootstrap.servers"));
    config.put(ConsumerConfig.GROUP_ID_CONFIG, env.getProperty("kafka.consumer.group"));
    config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

    return new DefaultKafkaConsumerFactory<>(config);
  }

  @Bean
  public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, Object> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.getContainerProperties().setCommitLogLevel(LogIfLevelEnabled.Level.DEBUG);
    factory.getContainerProperties().setMissingTopicsFatal(false);
    return factory;
  }

  @Bean
  public ProducerFactory<String, String> producerFactory() {
    Map<String, Object> config = new HashMap<>();
    config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, env.getProperty("kafka.consumer.bootstrap.servers"));
    config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    return new DefaultKafkaProducerFactory<>(config);
  }

  @Bean
  public KafkaTemplate<String, String> kafkaTemplate() {
    return new KafkaTemplate<>(producerFactory());
  }

所以我想并行消费,因为我可能会收到更多的消息。关于使用并行主题,我发现我需要为一个主题创建多个分区,并且需要为每个分区创建一个使用者。假设我的主题有10个分区,那么我可以让同一消费者组中的10个消费者分别读取一个分区。我理解这种行为。但我担心的是如何在应用程序中创建几个消费者。

我是否必须使用具有相同功能的@KafkaListener编写多个kafka消费者?在这种情况下,如果我需要X个相同的消费者,我需要在下面写X次方法。

@RetryableTopic(
            attempts = "3",
            backoff = @Backoff(delay = 1000, multiplier = 2.0),
            autoCreateTopics = "false")
      @KafkaListener(topics = "myTopic", groupId = "myGroupId")
      public void consume(@Payload(required = false) String message) {
    
           processMessage(message);
      
    }

我需要哪些选项或配置来实现与多个使用者的并行消费?

先谢谢你。

共有1个答案

穆彬郁
2023-03-14

< code>@KafkaListener有这个选项:

/**
 * Override the container factory's {@code concurrency} setting for this listener. May
 * be a property placeholder or SpEL expression that evaluates to a {@link Number}, in
 * which case {@link Number#intValue()} is used to obtain the value.
 * <p>SpEL {@code #{...}} and property place holders {@code ${...}} are supported.
 * @return the concurrency.
 * @since 2.2
 */
String concurrency() default "";

查看更多文档:https://docs.spring.io/spring-kafka/reference/html/#kafka-listener-annotation

 类似资料:
  • 我是Kafka的初学者。我知道具有相同组id的多个消费者不能在一个主题中使用来自同一个分区的消息。我想知道如果来自一个消费组的多个Kafka消费者从一个分区读取相同的消息会发生什么,为什么这是一件坏事。 。

  • 我试图有多个消费者的Kafka主题的多个分区与相同的groupId,这将帮助我扩大消费的消息。 根据Kafka的文件,它说: 如何让多个消费者拥有相同的消费者groupId,以实现负载平衡?

  • 我正在尝试对消费者群体进行实验 这是我的代码片段 } 当我同时运行两个spark流媒体作业时,它会出错 线程“main”java中出现异常。lang.IllegalStateException:当前没有分配给组织上的分区venkat4-1。阿帕奇。Kafka。客户。消费者内部。订阅状态。组织上的assignedState(SubscriptionState.java:251)。阿帕奇。Kafka。

  • 我以前认为设置我的消费者将始终收到他们尚未收到的消息,但最近我发现情况并非如此。这只在使用者尚未提交抵消时才起作用。在任何其他情况下,使用者将继续接收偏移大于其提交的最后偏移的消息。 由于我总是使用随机的组ID创建新的使用者,我意识到我的使用者“没有内存”,他们是新的使用者,并且他们永远不会提交偏移,因此策略将始终适用。我的疑虑就从这里开始了。假设以下场景: 我有两个客户端应用程序,A和B,每个客

  • 我正在使用spring kafka来消费来自kafka的消息。消费者监听器如下。 应用程序的单个实例,并发数为6。 该主题有6个分区。 从上面的日志中可以清楚地看到,两个用户在完全相同的时间收到了来自分区和偏移量的相同消息。 每个线程继续处理消息。最后,其中一个消费者失败了,错误如下 我知道上面的错误会在有负载或消息处理需要时间时出现。在这种情况下,处理不到一秒钟,kafka主题中的消息不到10条

  • 由于消息需求的排序,我们有一个主题和一个分区。我们有两个消费者运行在不同的服务器上,具有相同的配置集,即groupId、consumerId和consumerGroup。即 1主题- 当我们部署消费者时,相同的代码会部署在两台服务器上。当消息到来时,我们会注意到两个消费者都在消费消息,而不是只有一个处理。让消费者在两台独立的服务器上运行的原因是,如果一台服务器崩溃,至少其他服务器可以继续处理消息。