当前位置: 首页 > 知识库问答 >
问题:

Spring@KafkaListener和并发

经伟
2023-03-14

我正在与spring boot spring@KafkaListener合作。我期望的行为是:我的Kafka侦听器以10个线程读取消息。因此,如果其中一个线程挂起,其他消息将继续读取和处理消息。

我定义了bean of

@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
        ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
        ConsumerFactory<Object, Object> kafkaConsumerFactory)
{

    ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
    configurer.configure(factory, kafkaConsumerFactory);
    factory.getContainerProperties().setMissingTopicsFatal(false);
    factory.getContainerProperties().setCommitLogLevel(LogIfLevelEnabled.Level.INFO);
    return factory;
}

和spring启动配置:

spring.kafka.listener.concurrency=10

我看到所有配置都能正常工作,我在jmx中看到了我的10个线程:

但是我做了这样的测试:

 @KafkaListener(topics = {
            "${topic.name}" }, clientIdPrefix = "${kafka.client.id.prefix}", idIsGroup = false, id = "${kafka.listener.name}", containerFactory = "kafkaListenerContainerFactory")
    public void listen(ConsumerRecord<String, String> record)
    {
        if(record.getVersion() < 3) {
            try {
                Thread.sleep(20000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        else
            System.out.println("It works!");

    }

如果版本是

也许我的期望不是真的,这是Kafka听众的正确行为。请帮我处理Kafka的并发性,为什么会这样?

共有2个答案

姜景焕
2023-03-14

如果你想拥有故障转移Kafka,你必须启动更多的应用程序实例。

示例:你有一个名为test的主题,有一个分区,你将用同一个Kafka组创建两个应用实例。一个实例将处理您的数据,另一个将等待并开始处理消息,以防第一个实例崩溃。如果您有N个分区,其中包含应用程序的N 1或2或3个实例,则情况相同。此外,每个实例将只有一个使用者线程。

欲了解更多信息,请在谷歌上搜索:Kafka消费群体。

伯彦君
2023-03-14

kafka不是这样工作的;您至少需要和消费者一样多的分区(由Spring容器中的concurrency控制)。

而且,一次只能有一个消费者(在一个组中)从一个分区消费,因此,即使增加分区,其他消费者也不会收到“卡住”消费者后面同一分区中的记录。

 类似资料:
  • 我想创建一个并发的,它可以处理多个主题,每个主题都有不同数量的分区。 我注意到,对于大多数分区的主题,Spring Kafka每个分区只初始化一个使用者。 示例:我已经将并发设置为8。我得到了一个听以下主题的。主题A有最多的分区-5,所以Spring-Kafka初始化了5个消费者。我期望Spring-Kafka初始化8个消费者,这是根据我的并发属性允许的最大值。 主题A有5个分区 没有初始化更多消

  • 所以我很想知道。这两个有什么不同?2.在哪种场景下选择哪一种?

  • 我已经实现了Kafka消费者,现在我有了一个场景。 从Kafka流2.2.5中读取数据。通过Srpingboot发布 加载数据库表1 将数据从表1复制到表2 清理桌子1 要执行上述操作,我需要使用quartz的调度作业(已编写)暂停/恢复Kafka使用者,该作业将数据从表1复制到表2。但是在这个活动中,我希望我的Kafka听众暂停,一旦复制完成,它应该继续。 我的实施:

  • 我正在尝试使用SpringKafka将kafka与我的SpringBoot(v2.0.6版本)应用程序集成。现在我想要一个消费者和一个生产者。我让制作人工作得很好,我可以看到通过控制台消费者发送到主题的消息。我无法使用消费者代码,当Kafka主题中出现新消息时,它不会被调用。 这是我的Kafka配置类: 以下是我的pom依赖项: 以及消费者代码: 我正在我的计算机上运行kafka,正如我所说的——

  • 我正在编写一个应用程序(Spring Kotlin),它可以获取Kafka的信息。如果我在声明@KafkaListener时设置autoStartup=“true”,则应用程序可以正常运行,但前提是代理可用。当代理不可用时,应用程序在启动时崩溃。这是不受欢迎的行为。应用程序必须工作并执行其他功能。 为了避免应用程序在启动时崩溃,另一个主题建议在声明@KafkaListener时设置autoStar

  • 我试图在不使用@Kafkalistener的情况下编写kafka consumer,下面是我用于配置侦听器的代码行: 在这里,我如何配置topic和listener方法,我的consumer类可以有多个方法。 另外,我想知道在将@kafkalistener与Kafka流一起使用时是否会遇到任何潜在问题。 附言:我不想使用@KafkaListener。