当前位置: 首页 > 知识库问答 >
问题:

Spring kafka@KafkaListener未被调用

公冶麒
2023-03-14

我正在尝试使用SpringKafka将kafka与我的SpringBoot(v2.0.6版本)应用程序集成。现在我想要一个消费者和一个生产者。我让制作人工作得很好,我可以看到通过控制台消费者发送到主题的消息。我无法使用消费者代码,当Kafka主题中出现新消息时,它不会被调用。

这是我的Kafka配置类:


@Configuration
@EnableKafka
public class KafkaConfiguration {

    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();

        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, <path to my custom serializer>);
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

        return props;
    }

    @Bean
    public Map<String, Object> consumerConfigs(){
        Map<String, Object> props = new HashMap<>();

        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, <path to my custom deserializer>);
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
//        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        return props;
    }

    @Bean
    public ConsumerFactory<String, List<MyMessage>> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, List<MyMessage>>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, List<MyMessage>> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

    @Bean
    public ProducerFactory<String, List<MyMessage>> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<String, List<MyMessage>> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

}

以下是我的pom依赖项:

<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
</dependency>

以及消费者代码:


@Slf4j
public class MyMessageConsumer {

    @KafkaListener(topicPattern = "test_topic")
    public void receive(List<MyMessage> myMessages){
        log.info("Received payload {}", myMessages);
    }
}

我正在我的计算机上运行kafka,正如我所说的——我可以看到消息出现在控制台消费者中。我基于Spring的消费者不会收到消息的原因是什么?

编辑:这是反序列化器类:


public class TagEventDeserializer implements Deserializer<List<MyMessage>> {
    @Override
    public void configure(Map map, boolean b) {

    }

    @Override
    public List<MyMessage> deserialize(String s, byte[] bytes) {
        ObjectMapper objectMapper = new ObjectMapper();
        List<MyMessage> myMessages = null;
        try{
            myMessages = objectMapper.readValue(bytes , List.class);
        } catch (Exception e){
            e.printStackTrace();
        }
        return myMessages;
    }

    @Override
    public void close() {

    }
}

共有1个答案

宇文鸿振
2023-03-14

因为由于缺少原型注释,您的类MyMessage消费者没有被Spring扫描,所以例如向此类添加@Component将解决您的问题:

@Slf4j
@Component
public class MyMessageConsumer {

    @KafkaListener(topicPattern = "test_topic")
    public void receive(List<MyMessage> myMessages){
        log.info("Received payload {}", myMessages);
    }
}

我认为在您的@KafkaListner注释中指定一个消费者组是个好主意

 类似资料:
  • 我想为整个Kafka流编写一个集成测试。 在我的生产代码中,我有: 在我的测试代码中,我使用KafkaProducer 我希望有一个钩子,表明调用了KafkaListener。我可以在测试中加入一些延迟,但这是一个糟糕的做法,我想避免它。 有没有更好的方法来等待正在处理的?

  • 问题内容: 我有一个非常简单的Servlet和一个非常简单的HttpSessionListener: 我的方法从未被调用(没有日志输出),最终我在调用getSession()的那一行上得到一个 我尝试拨打电话时也没有问题,但存在相同的问题。 我不明白- 注释不足以调用我的侦听器吗?Eclipse甚至在下方将其显示为侦听器。 问题答案: 原来这是愚蠢的Eclipse问题之一… Project-> C

  • 我正在使用一个简单的并每隔毫秒调用: 问题是虽然计时器在正确的时间结束,但是方法并没有每毫秒被调用一次。每次都少了几毫秒。 我使用日志检查它,发现的值在最后是597,而它应该是999,因为每次调用时它都会得到1。有精度误差吗?

  • 我的从未被调用,即使我在inapp对话框中单击立即购买,logcat也不会打印任何内容。

  • 我是写测试和使用Mockito的新手。我在Stackoverflow上阅读了类似的主题,并做了建议的更改,确保所考虑的类/接口/方法是开放的。 我试图跟踪这个 模仿构造函数注入的依赖项 这是我目前想出来的测试 但我一直得到的回应是 即使我在测试中没有提到这种方法,我也得到了这种反应。这是我的演示者注册方法。我已经改变了类/接口 同样地 这里是接口 感谢您的帮助。

  • 问题:为什么从不调用? 我在xml配置中扩展了,如 MyEntityManagerFactoryBean MyInterceptor: 更新:[解释为什么自定义脏政策看起来不像我的方式] 我希望每次更改实体中的内容时更新时间戳,但除外。同时,应该是持久的,而不是暂时的(意味着导致实体变脏)。 由于我使用Spring事务和Hibernate模板,有一些细微差别: 1) 我无法在每个setter的末尾