当前位置: 首页 > 知识库问答 >
问题:

spring kafka:如果我在应用程序中定义了一个记录制作人,kafka消费者就不会收到记录

娄建义
2023-03-14

我正在使用Spring和Spring Kafka编写一个小型PoC。我的目标是让生产者和消费者都从这个主题中写作(例如阅读)。

我有一个奇怪的情况:

  • 制作人正确地制作了这些记录(我可以通过Python脚本使用它们)

下面是我的代码——它与文档示例非常相似。更准确地说,问题来自这样一个事实,即KafkanConsumerConfiguration中的bean不是由Spring创建的(也就是说,构建它们的方法永远不会被调用)。

Kafkaproducer配置。JAVA

@Configuration
public class KafkaProducerConfiguration {

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:32768");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return new DefaultKafkaProducerFactory<>(props);
    }
}

消息ender.java

@Component
public class MessageSender {
    final static private Logger log = Logger.getLogger(MessageSender.class);

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @PostConstruct
    public void onConstruct() throws InterruptedException {
        log.info("Sending messages...");
        for (int i = 0; i < 100; ++i) {
            kafkaTemplate.send("mytopic", "this is a message");
            Thread.sleep(1000);
        }
        kafkaTemplate.flush(); // NOTE: no changes if I move this call in the loop
        log.info("Done sending messages");
    }
}

Kafka消费者配置。JAVA

@Configuration
@EnableKafka
public class KafkaConsumerConfiguration {

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());

        return factory;
    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:32768");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-service");
        return props;
    }
}

MyMessageListener。JAVA

@Service
public class MyMessageListener {
    final static private Logger log = Logger.getLogger(MyMessageListener.class);

    @PostConstruct
    public void onConstruct() {
        log.info("Message listener started");
    }

    @KafkaListener(topics = "mytopic")
    public void onMessageReceived(String message) {
        log.info("Got message: "+ message);
    }
}

以下是应用程序生成的日志,供参考:https://pastebin.com/BY783jiL.如您所见,没有创建消费bean(否则将出现一个块ConsumerConfig values:…)。

以下是我尝试过但没有成功的几件事:

  • 将生产者和消费者配置放在同一个配置类中
  • 更改Kafka消费者配置中bean的名称(并在MyMessageListener.onMessageReception方法上添加注释属性容器工厂="myBeanName"
  • 将类的名称Kafka消费者配置更改为其他内容
  • 在我的Kafka消费者配置中添加一个与kafka无关的@Bean以查看它是否会被创建(它会)

版本:Spring Boot 1.5.9,Spring-Kafka:1.1.7。

我已经撕了几个小时的头发了,非常感谢您的帮助。

谢谢

共有2个答案

齐锐进
2023-03-14

刚刚发现问题<代码>消息发送者。onConstruct实际上需要花费大量时间来执行(100秒),在此期间,它会阻止Spring创建其他bean。

余弘毅
2023-03-14
kafkaTemplate.send("mytopic", "this is a message");

您永远不应该以@PostConstruct方法开始与外部服务交互——您需要等待应用程序构建完成后再这样做。

实现SmartLifeyle,为isAutoStartup返回true并将该代码移动到start()

或者实现ApplicationListener

无论哪种方式都将确保应用程序已准备就绪。

 类似资料:
  • 我的问题是关于Kafka在爪哇的消费者 > 已启动Kafka服务器 创建的主题 创作者 创建的消费者 我在终端中做的所有这些事情,工作正常,能够在消费者处正确接收日志。运行下面的消费者(在Java),但没有收到任何记录。它继续汇集在 也没有收到任何记录。 下面给出的我的 java 消费者代码 请告诉我在java消费者类中接收消息,我在配置中做错了什么吗?属性中的“group.id”是怎么回事? 下

  • 我写了一个Kafka消费者从主题中获取所有记录,然后只进入下一步,但它没有获取所有记录。

  • 问题内容: 我有一个dropwizard应用程序,在该应用程序中,我配置了logger附加程序,使其文件如下: 并且,在我的应用中创建了记录器: 在main()中进行一些测试记录: 该应用程序启动并运行没有问题。但是我没有在stdout或mylogs.log文件中得到任何日志(当然,除了Jetty访问日志之外,这些日志也已正确打印到mylogs.log中)。相反,如果我删除configuratio

  • 我在本地机器上安装了Kafka,并启动了zookeeper和一个代理服务器。 现在我有一个单独的主题,描述如下: 我有一个生产者在消费者启动之前产生了一些消息,如下所示: 当我使用--从头开始选项启动消费者时,它不会显示生产者生成的所有消息: 但是,它显示的是新添加的消息。 我在这里怎么了?有什么帮助吗?

  • 我正在尝试使用reamer-kafka来消耗消息。其他一切都很好,但我想为失败的消息添加重试(2)。spring-kafka已经默认重试失败记录3次,我想使用reamer-kafka实现相同。 我用SpringKafka作为反应Kafka的包装。以下是我的消费者模板: 让我们考虑消耗方法如下 我使用以下逻辑在失败时重试消耗方法。 如果当前消费者记录异常失败,我想重试使用该消息。我试图用另一次重试(

  • 我在一台Windows主机上安装了两个Kafka 2.1.0代理。默认复制因子设置为2。所有其他设置均为默认设置。 networkClient:[Consumer ClientID=Consumer-1,GroupID=SOUT]无法建立到节点-2(/192.168.0.1:19092)的连接。代理可能不可用。 消费者: 一个制作人: