当前位置: 首页 > 知识库问答 >
问题:

如何使用Spring-Kafka实现消费者线程安全

慎星纬
2023-03-14

我正在使用Spring boot2.1.7。RELEASE和spring-kafka 2.2.7。RELEASE。我正在使用@KafkaListener注释来创建一个消费者,我正在使用消费者的所有默认设置。

这是我的消费者配置:

@Configuration
@EnableKafka
public class KafkaConsumerCommonConfig implements KafkaListenerConfigurer {

      @Bean
      public <K,V> ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(primaryConsumerFactory());
        factory.getContainerProperties().setMissingTopicsFatal(false);
        return factory;
      }

      @Bean
      public DefaultKafkaConsumerFactory<Object, Object> primaryConsumerFactory() {
        return new DefaultKafkaConsumerFactory<>(sapphireKafkaConsumerConfig.getConfigs());
      }

}

由于某些原因,我在同一个应用程序中有多个使用者,如下所示。

@KafkaListener(topics = "TEST_TOPIC1")
public void consumer1(){

}

@KafkaListener(topics = "TEST_TOPIC2")
public void consumer2(){

}

@KafkaListener(topics = "TEST_TOPIC3")
public void consumer3(){

}

尽管如此,根据关于“消费者线程安全”的合流文件

一个线程中不能有多个属于同一组的使用者,也不能有多个线程安全地使用同一使用者。每个线程一个使用者是规则。要在一个应用程序中运行同一组中的多个使用者,需要在各自的线程中运行每个使用者。将消费者逻辑封装在自己的对象中,然后使用Java的ExecutorService启动多个线程,每个线程都有自己的消费者,这是很有用的。

现在我的问题是,当使用上面的代码使用spring kafka来解决这个场景时,我应该做些什么额外的事情吗?还是不重要,因为我没有指定组是随机生成的?请提出建议。

共有1个答案

景成和
2023-03-14

您的所有侦听器都针对不同的主题进行了配置。所以,他们的团队完全没有关系。当同一主题有多个消费者时,团队就会出现。

 类似资料:
  • 我正在使用Spring Kafka消费者。我已将并发设置为10,并创建了5个消费者(用于5个主题)。所以有50个Spring Kafka消费者线程。 Kafka消费者可以使用的最大线程数是多少?如何增加此线程池的大小?我查阅了spring文档,但没有发现任何相关内容。

  • Kafka的doc给出了一种方法,大约用以下描述: 每个线程一个消费者:一个简单的选择是为每个线程提供自己的消费者 我的代码: 但它不起作用,引发了一个异常: JAVAutil。ConcurrentModificationException:KafkaConsumer对于多线程访问不安全 此外,我还阅读了Flink(一个用于分布式流和批处理数据的开源平台)的源代码。Flink使用多线程消费程序与我

  • 如何提高Kafka消费者的绩效?我有(并且需要)至少一次Kafka消费语义学 我有以下配置。processInDB()需要2分钟才能完成。因此,仅处理10条消息(全部在单个分区中)就需要20分钟(假设每条消息2分钟)。我可以在不同的线程中调用processInDB,但我可能会丢失消息!。如何在2到4分钟的时间窗口内处理所有10条消息? 下面是我的Kafka消费者代码。

  • 我编写了一个Kafka消费者(使用Spring Kafka),它从单个主题读取并且是消费者组的一部分。一旦消息被消费,它将执行所有下游操作并继续下一个消息偏移量。我已将其打包为WAR文件,我的部署管道将其推送到单个实例。使用我的部署管道,我可能可以将此工件部署到我的部署池中的多个实例。 但是,当我希望多个消费者作为我的基础设施的一部分时,我无法理解以下内容- > 实际上,我可以在部署池中定义多个实

  • 我们有一个应用程序,它使用来自Kafka主题(3个分区)的消息,丰富数据,并将记录保存在DB(Spring JPA)中,然后将消息发布到另一个Kafka主题(在同一个代理上),所有这些都通过使用Camel 2.4.1和Spring Boot 2.1.7进行编排。释放 我们想为 kafka 消费者-生产者组合实现“exactly-once”语义。 消费者设置: 生产者设置: 豆接线: 骆驼路线: 但

  • 以下是我的情况: 我们有一个Spring cloud Stream 3 Kafka服务连接到同一个代理中的多个主题,但我想基于属性控制连接到特定主题。 每个主题都有自己的活页夹和绑定,但代理对所有人来说都是一样的。 我尝试使用下面的属性禁用绑定(这是我到目前为止找到的唯一解决方案),这适用于StreamListener不接收消息,但与主题的连接和重新平衡仍在发生。 我想知道活页夹级别是否有任何设置