当前位置: 首页 > 知识库问答 >
问题:

@KafkaListener(Spring)的开头有问题

林烨烨
2023-03-14

我正在编写一个应用程序(Spring Kotlin),它可以获取Kafka的信息。如果我在声明@KafkaListener时设置autoStartup=“true”,则应用程序可以正常运行,但前提是代理可用。当代理不可用时,应用程序在启动时崩溃。这是不受欢迎的行为。应用程序必须工作并执行其他功能

为了避免应用程序在启动时崩溃,另一个主题建议在声明@KafkaListener时设置autoStartup=“false”。这真的有助于防止启动时崩溃。但现在我无法手动成功启动KafkaListener。在其他示例中,我看到了Kafkalistener注册表的自动连接,但当我尝试这样做时:

@Service
class KafkaConsumer @Autowired constructor(
        private val kafkaListenerEndpointRegistry: KafkaListenerEndpointRegistry
) {

IntelliJ Idea警告:

无法自动连线。未找到“KafkaListenerEndpointRegistry”类型的bean。

当我尝试在没有自动连线的情况下使用KafkaListenerEndpointRegistry并执行以下代码时:

@Service
class KafkaConsumer {
    private val logger = LoggerFactory.getLogger(this::class.java)
    private val kafkaListenerEndpointRegistry = KafkaListenerEndpointRegistry()

    @Scheduled(fixedDelay = 10000)
    fun startCpguListener(){
        val container = kafkaListenerEndpointRegistry.getListenerContainer("consumer1")
        if (!container.isRunning)
            try {
                logger.info("Kafka Consumer is not running. Trying to start...")
                container.start()
            } catch (e: Exception){
                logger.error(e.message)
            }
    }

    @KafkaListener(
            id = "consumer1",
            topics = ["cpgdb.public.user"],
            autoStartup = "false"
    )
    private fun listen(it: ConsumerRecord<JsonNode, JsonNode>, qwe: Consumer<Any, Any>){
        val pay = it.value().get("payload")
        val after = pay.get("after")
        val id = after["id"].asInt()
        
        val receivedUser = CpguUser(
                id = id,
                name = after["name"].asText()
        ) 
        logger.info("received user with id = $id")
        }
    }
}

kafkalistener注册表。getListenerContainer(“consumer1”)始终返回null。我猜是因为我没有自动连线Kafka利斯特。我该怎么做?或者,如果我的答案还有其他解决方案,我将非常感谢任何帮助!谢谢

有Kafka配置:

@Configuration
@EnableConfigurationProperties(KafkaProperties::class)
class KafkaConfiguration(private val props: KafkaProperties) {

    @Bean
    fun kafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<Any, Any> {
        val factory = ConcurrentKafkaListenerContainerFactory<Any, Any>()
        factory.consumerFactory = consumerFactory()
        factory.setConcurrency(1)
        factory.setMessageConverter(MessagingMessageConverter())
        factory.setStatefulRetry(true)

        val retryTemplate = RetryTemplate()
        retryTemplate.setRetryPolicy(AlwaysRetryPolicy())
        retryTemplate.setBackOffPolicy(ExponentialBackOffPolicy())
        factory.setRetryTemplate(retryTemplate)
        val handler = SeekToCurrentErrorHandler()
        handler.isAckAfterHandle = false
        factory.setErrorHandler(handler)
        factory.containerProperties.isMissingTopicsFatal = false

        return factory
    }

    @Bean
    fun consumerFactory(): ConsumerFactory<Any, Any> {
        return DefaultKafkaConsumerFactory(consumerConfigs())
    }

    @Bean
    fun consumerConfigs(): Map<String, Any> {
        return mapOf(
                ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to props.bootstrap.address,
                ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to JsonDeserializer::class.java,
                ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to JsonDeserializer::class.java,
                ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG to listOf(MonitoringConsumerInterceptor::class.java),
                ConsumerConfig.CLIENT_ID_CONFIG to props.receiver.clientId,
                ConsumerConfig.GROUP_ID_CONFIG to props.receiver.groupId,
                ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "earliest",
                ConsumerConfig.ISOLATION_LEVEL_CONFIG to "read_committed",
                ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to true
        )
    }
}
  • spring boot版本:2.3.0
  • SpringKafka版本:2.5.3
  • Kafka客户端版本:2.5.0

共有1个答案

桂学
2023-03-14

忽略IntelliJ关于自动布线的警告;豆子确实存在;只是IntelliJ无法检测到。

 类似资料:
  • 我已经通过Spring Boot实现了一个Kafka简单主题消费者,我想使用@KafkaListener注释来获取消息。到目前为止还不错,我对Kafka的方法很满意。我现在面临的问题是,我正在阅读的由第三方发送到这个主题的内容,我认为是由云流绑定生成的,并且有如下标题:ëë½ contentType“text/plain” originalContentType“application/jso

  • 所以我很想知道。这两个有什么不同?2.在哪种场景下选择哪一种?

  • 我正在与spring boot spring@KafkaListener合作。我期望的行为是:我的Kafka侦听器以10个线程读取消息。因此,如果其中一个线程挂起,其他消息将继续读取和处理消息。 我定义了bean of 和spring启动配置: 我看到所有配置都能正常工作,我在jmx中看到了我的10个线程: 但是我做了这样的测试: 如果版本是 也许我的期望不是真的,这是Kafka听众的正确行为。请

  • 我试图在不使用@Kafkalistener的情况下编写kafka consumer,下面是我用于配置侦听器的代码行: 在这里,我如何配置topic和listener方法,我的consumer类可以有多个方法。 另外,我想知道在将@kafkalistener与Kafka流一起使用时是否会遇到任何潜在问题。 附言:我不想使用@KafkaListener。

  • 我正在尝试使用SpringKafka将kafka与我的SpringBoot(v2.0.6版本)应用程序集成。现在我想要一个消费者和一个生产者。我让制作人工作得很好,我可以看到通过控制台消费者发送到主题的消息。我无法使用消费者代码,当Kafka主题中出现新消息时,它不会被调用。 这是我的Kafka配置类: 以下是我的pom依赖项: 以及消费者代码: 我正在我的计算机上运行kafka,正如我所说的——

  • 我只想了解@kafkaListener的范围是什么,原型还是单例。在单个主题的多个消费者的情况下,返回的是单个实例还是多个实例。在我的情况下,我有多个客户订阅单个主题并获得报告。我只是想知道如果 > 多个客户希望同时查询报告。在我的例子中,我在成功使用消息后关闭容器,但同时如果其他人想要获取报告,则容器应该打开。 如何将作用域更改为与容器的id相关联的原型(如果不是),以便每次都可以生成单独的实例