当前位置: 首页 > 知识库问答 >
问题:

Spring启动Kafka不起作用-消费者没有收到消息

柴赞
2023-03-14

我正在尝试运行一个简单的Spring Boot Kafka应用程序,但我无法使其工作。我遵循了各种教程,现在我正在实现这个教程,但当我启动应用程序时,会发生以下情况:

我可以在控制台中写入,但消费者没有收到任何消息
这是我的SpringApplication类:

@SpringBootApplication(scanBasePackages = "com.springmiddleware")
@ComponentScan("com.springmiddleware")
@EnableAutoConfiguration
@EntityScan("com.springmiddleware")
public class SpringMiddlewareApplication implements CommandLineRunner{



    public static void main(String[] args) throws Exception {

        SpringApplication.run(SpringMiddlewareApplication.class, args);

    }

    @Autowired
    private Producer sender;

    @Override 
    public void run (String... strings) {
        sender.send("Hello world");
    }

}

application.yml:

spring:
  kafka:
    bootstrap-servers: localhost:8080

app:
  topic:
    foo: foo.t

logging:
  level:
    root: ERROR
    org.springframework.web: ERROR
    com.memorynotfound: DEBUG

消费者类、生产者类及其配置类与教程中所写的相同。< br >在我的server.properties文件中,我有:

zookeeper.connect=localhost:8080

在 zookeeper.properties 中:

clientPort=8080

在 application.yml 中指定的相同端口。在启动应用程序之前,我运行

.\bin\windows\zookeeper-server-start.bat config\zookeeper.properties

.\bin\windows\kafka-server-start.bat config\server.properties

更新

这是ReceiverConfig类:

@EnableKafka
@Configuration
public class ReceiverConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "foo");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        return props;
    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

}

这是 SenderConfig 类:

    @Configuration
public class SenderConfig {


    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

这是 Consumer 类中的方法侦听

@KafkaListener(topics = "${app.topic.foo}")
    public void listen(@Payload String message) {
        System.out.println("Received " + message);
    }

生产者类:

@Service
public class Producer {

     @Autowired
     private KafkaTemplate<String, String> kafkaTemplate;

     @Value("${app.topic.foo}")
        private String topic;

     public void send(String message){
            kafkaTemplate.send(topic, message);
        }
}

更新2

〔2019-04-01 17:23:52492〕INFO为客户端建立会话0x100435950880000,协商超时6000/0:0:0:0:0:1:6079(org.apache.zookeeper.server.ZooKeeperServer)〔2019-004-01 17:23:52539〕INFO在处理会话ID:0x100435950980000类型时获得用户级KeeperException:create cxid:0x1 zxid:0xef txtype:-1 reqpath:n/a错误路径:/cconsumers错误:KeeperErrorCode=NodeExists for/consumers(org.apache.zookeeper.server.PrepRequestProcessor)[2019-04-01 17:23:52555]INFO处理会话时获得用户级KeeperException:0x100435950880000类型:create cxid:0x2zxid:0xf0 txntype:-1 reqpath:n/a错误路径:/brokers/ids错误:KeeperErrorCode=NodeExists for/brokers/ids(org.apache.szookeepr.server.PrepRequest Processor)[2019-04-01 17:23:52555]INFO获得用户级处理会话ID时的KeeperException:0x100435950880000类型:create cxid:0x3 zxid:0xf1 txtype:-1 reqpath:n/a错误路径:/brokers/topics错误:KeeperErrorCode=NodeExists for/brokers/ctopics(org.apache.zookeeper.server.PrepRequestProcessor)[2019-04-01 17:23:52555]信息处理会话ID:0x100435850880000类型时获得用户级KeeperExcept:create cxid:0x4 zxid:0xf2 txtype:-1reqpath:n/a错误路径:/config/changes错误:KeeperErrorCode=NodeExists for/config/change(org.apache.zookeeper.server.PrepRequestProcessor)[2019-04-01 17:23:52570]信息处理sessionid时获得用户级KeeperException:0x100435950880000类型:create cxid:0x5 zxid:0xf3 txtype:-1 reqpath:n/a错误路径:/admin/delete_tops错误:Keeper ErrorCode=Node Exists for/admin/deelete_tops(org.apache.zookeeper.server.PrepRequestProcessor)[2019-04-01 17:23:52570]信息处理会话时获得用户级KeeperException:0x100435950880000类型:create cxid:0x6 zxid:0xf4 txntype:-1 reqpath:n/a错误路径:/brokers/seqid错误:KeeperErrorCode=NodeExists for/brokers/seqid处理会话ID时出现KeeperException:0x100435950880000类型:create cxid:0x7 zxid:0xf5 txtype:-1 reqpath:n/a错误路径:/is_change_notification错误:KeeperErrorCode=NodeExists for/is_change_notification(org.apache.zookeeper.server.PrepRequestProcessor)[2019-04-01 17:23:52586]信息处理会话ID:0x100435850880000类型时出现用户级KeeperExceptionzxid:0xf6 txtype:-1 reqpath:n/a错误路径:/latest_producter_id_block错误:KeeperErrorCode=NodeExists for/latest.producter_id _block(org.apache.zookeeper.server.PrepRequestProcessor)[2019-04-01 17:23:52586]信息处理会话id:0x100435950880000类型:create cxid:0x9 zxid:0xf7 txtype:-1 reqpath:n/a错误路径:/log_dir_event_notification错误:KeeperErrorCode=NodeExists for/log_dir_event_notification(org.apache.zookeeper.server.PrepRequestProcessor)[2019-04-01 17:23:52602]INFO在处理会话ID:0x100435950880000类型时获得用户级KeeperException:create cxid:0xa zxid:0xf8 txtype:-1 reqpath:n/a错误路径:/config/topics错误:Keeper ErrorCode=Node Exists for/config/ttopics(org.apache.zookeeper.server.PrepRequestProcessor)[2019-04-01 17:23:52602]INFO处理会话时获得用户级KeeperException:0x100435950880000类型:create cxid:0xb zxid:0xf9 txntype:-1 reqpath:n/a错误路径:/config/clients错误:KeeperErrorCode=NodeExists for/config/customers(org.apache.szookeepr.server.PrepRequest Processor)[2019-04-01 17:23:52617]INFO获得用户级处理会话ID时出现KeeperException:0x100435950880000类型:create cxid:0xc-zxid:0xfa txtype:-1 reqpath:n/a错误路径:/config/users错误:KeeperErrorCode=NodeExists for/config/user(org.apache.zookeeper.server.PrepRequestProcessor)[2019-04-01 17:23:52617]信息处理会话ID:0x100435850880000类型时出现用户级KeeperExceptionreqpath:n/a错误路径:/config/brokers错误:KeeperErrorCode=NodeExists for/config/browkers(org.apache.zookeeper.server.PrepRequestProcessor)[2019-04-01 17:23:53564]信息在处理sessionid:0x100435950880000类型时获得用户级KeeperException:multi-cxid:0x3a-zxid:0xff-txtype:-1 reqpath:n/a中止剩余的多操作。错误路径:/admin/preferred_replicate_election错误:KeeperErrorCode=/admin/preferred_Replicate_eelection的NoNode(org.apache.gookeeper.server.PrepRequestProcessor)

共有1个答案

潘自强
2023-03-14

在您的 application.yml 中,您指定了 zookeeper 端口而不是 kafka 代理端口

spring:
  kafka:
    bootstrap-servers: localhost:8080

在上面,您应该定义kafka代理的端口,即server.properties文件的< code>port=的值。

默认情况下,Spring Boot应用程序在端口8080上运行,因此请不要对Zookeeper端口使用相同的端口,除非您更改了Spring Boot应用的默认端口。

所以在server.properties中,有port=9092zookeeper.connect=localhost:2181,在application.yml中,有如下:

spring:
  kafka:
    bootstrap-servers: localhost:9092

然后在zookeeper.properties中,让clientPort=2181。然后以相同的顺序重新启动Zoo,Kafka服务器和Spring Boot应用程序。

更新:

Kafka的新版本在服务器中使用listener=PLAINTEXT://localhost:9092而不是port=9092。属性文件。所以试着替换它。

 类似资料:
  • 消费者使用Spring的JavaConfig类如下: Kafka主题侦听器使用@KafkaListener注释,如下所示: 我的pom包括依赖项: 现在当我打包到war并部署到tomcat时,它不会显示任何错误,即使在调试模式下也不会显示任何错误,只是部署war什么都没有。 请帮助我了解是否缺少触发kafkalistner的某些配置。 谢谢Gary我添加了上下文。xml和web。xml,但我得到了

  • 我正在使用以下在docker上运行kafka、zookeeper和kafdrop: 我有一个具有以下配置的Spring Boot Producer应用程序-: 在我的中,我有以下内容: 这是一个单独的应用程序,我在我的服务中这样称呼Kafka制作人: 在一个完全不同的spring引导应用程序中,我有一个像这样的使用者: 我可以看到消费者正在连接到代理,但是有消息的日志。下面是我能看到的完整日志:

  • 我们有一个制作人 在开发过程中,我重新部署了producer应用程序,并做了一些更改。但在此之后,我的消费者没有收到任何消息。我尝试重新启动消费者,但没有成功。问题可能是什么和/或如何解决? 消费者配置: 生产者配置: 编辑2: 5分钟后,消费者应用程序死亡,但以下情况除外:

  • 我不知道是怎么回事,我的java客户机消费者用@KafkaListener注释后没有收到任何消息。当我通过命令行创建消费者时,它可以工作。同样,Producer也能按预期工作(同样在java中)。有人能帮我理解这种行为吗? application.yml 生产者配置: 消费者配置: 制作人 Spring控制器: 这是我的控制台输出,正如您所看到的,它发送一条消息,但该方法不接收任何内容。如果我没有

  • 我在mac上运行Kafka和Flink作为docker容器。 我已经实现了Flink作业,它应该消耗来自Kafka主题的消息。我运行一个向主题发送消息的python生产者。 工作开始时没有问题,但没有收到任何消息。我相信这些消息被发送到了正确的主题,因为我有一个能够使用消息的python消费者。 flink作业(java): Flink作业日志: 生产者作业(python):(在主机上运行-不是d

  • 我是Kafka的新手。我在网上读了很多关于Kafka制作人和Kafka消费者的说明。我成功地实现了前者,它可以向Kafka集群发送消息。然而,我没有完成后一个。请帮我解决这个问题。我看到我的问题像StackOverflow上的一些帖子,但我想更清楚地描述一下。我在虚拟盒子的Ubuntu服务器上运行Kafka和Zookeeper。使用1个Kafka集群和1个Zookeeper集群的最简单配置(几乎是