当前位置: 首页 > 知识库问答 >
问题:

为什么Kafka的Spring战靴启动失败?

楚元章
2023-03-14

有一个带有kafka依赖的Spring启动应用程序,有两个kafka主题,需要从它们中读取消息

tacocloud.orders.topic
tacocloud.tacos.topic

并且已经成功地在里面发送了消息

配置kafka配置听这个主题像这样

 package com.example.tacocloud.config;

import com.example.tacocloud.model.jpa.Order;
import com.example.tacocloud.model.jpa.Taco;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.KafkaMessageListenerContainer;
import org.springframework.kafka.listener.MessageListener;

import java.util.HashMap;
import java.util.Map;

@Slf4j
@Configuration
@EnableKafka
@EnableConfigurationProperties
public class KafkaConfig {

  // Order

    @Bean
    @ConfigurationProperties("spring.kafka.consumer.order")
    public Map<String, Object> orderConsumerConfig() {
        return new HashMap<>();
    }

    @Bean
    public DefaultKafkaConsumerFactory<Object, Order> orderConsumerFactory(@Qualifier("orderConsumerConfig")
        Map<String, Object> orderConsumerConfig) {
        return new DefaultKafkaConsumerFactory<>(orderConsumerConfig);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Order> order1KafkaMessageListenerContainer(
        @Qualifier("orderConsumerConfig") Map<String, Object> orderConsumerConfig,
        @Qualifier("orderConsumerFactory") DefaultKafkaConsumerFactory orderConsumerFactory) {
        ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(orderConsumerFactory);
        return factory;
    }

    // Taco

    @Bean
    @ConfigurationProperties("spring.kafka.consumer.taco")
    public Map<String, Object> tacoConsumerConfig() {
        return new HashMap<>();
    }

    @Bean
    public DefaultKafkaConsumerFactory tacoConsumerFactory(
        @Qualifier("tacoConsumerConfig") Map<String, Object> tacoConsumerConfig) {
        return new DefaultKafkaConsumerFactory<>(tacoConsumerConfig);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory tacoConcurrentMessageListenerContainer(
        @Qualifier("tacoConsumerConfig") Map<String, Object> tacoConsumerConfig,
        @Qualifier("tacoConsumerFactory") DefaultKafkaConsumerFactory tacoConsumerFactory) {
        ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(tacoConsumerFactory);
        return factory;
    }
}

因此,有两个DefaultKafkaClient erFactory和两个ConAutomtKafkaListenerContainerFactory Aften,创建了一个带有@KafkaListener日志消息的服务:

package com.example.tacocloud.service;

import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
@EnableKafka
public class KafkaService {

    @KafkaListener(topics = "tacocloud.orders.topic", groupId = "one")
    public void order() {
        System.out.println("Order");
    }

    @KafkaListener(topics ="tacocloud.tacos.topic", groupId = "two")
    public void taco() {
        System.out.println("Taco");
    }
}

应用yml文件

spring:
  kafka:
    consumer:
      order:
        topic: tacocloud.orders.topic
        "[bootstrap.servers]": localhost:29888
        "[key.deserializer]": org.apache.kafka.common.serialization.StringDeserializer
        "[value.deserializer]": com.example.tacocloud.model.serialization.OrderDeserializer
        template:
          "[default.topic]": tacocloud.orders.topic
      taco:
        topic: tacocloud.tacos.topic
        "[bootstrap.servers]": localhost:29888
        "[key.deserializer]": org.apache.kafka.common.serialization.StringDeserializer
        "[value.deserializer]": com.example.tacocloud.model.serialization.TacoDeserializer
        template:
          "[default.topic]": tacocloud.tacos.topic

但是,当我启动我的Spring启动应用程序时,我看到错误消息(((

2022-04-15 21:38:35.918错误13888---[restartedMain]开机。SpringApplication:应用程序运行失败

进程已完成,退出代码为0

共有1个答案

商华藏
2023-03-14

谢谢你的样品。

所以,我在本地打开它,并在这个bean定义中放置了一个断点:

@Bean
public DefaultKafkaConsumerFactory<Object, Order> orderConsumerFactory(@Qualifier("orderConsumerConfig")
    Map<String, Object> orderConsumerConfig) {
    return new DefaultKafkaConsumerFactory<Object, Order>(orderConsumerConfig);
}

orderConsumerConfig地图如下所示:

orderConsumerConfig = {LinkedHashMap@8587}  size = 1
 "orderConsumerConfig" -> {HashMap@8600}  size = 5
  key = "orderConsumerConfig"
  value = {HashMap@8600}  size = 5
   "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer"
   "template" -> {LinkedHashMap@8611}  size = 1
   "topic" -> "tacocloud.orders.topic"
   "bootstrap.servers" -> "localhost:29888"
   "value.deserializer" -> "sample.kafka.serializer.OrderDeserializer"

因此,您的KafkaConsumer没有正确初始化,这并不奇怪。您的目标地图配置隐藏在该地图的orderConsumerConfig条目下。

你介意和我分享一下你是从哪里了解到映射bean上的@ConfigurationProperties的吗?那该如何作为依赖注入呢?

 类似资料:
  • 我正在使用Spring Boot实现rest服务。实体类在一个单独的包中定义。所以我在A中添加了组件注释pplication.java. 这是我的控制器类: 我还创建了一个JPA数据存储库,扩展了JPARepository,并在其中添加了自定义查询代码。 下面是实现服务接口的servicebean类 当我启动应用程序并在浏览器中键入以下url时“http://localhost:8080/api/

  • 笔记: 必须安装Java。获取JRE(http://www.java.com)或者JDK 从环境变量JAVA_HOME(C:\Program Files\Java\jdk1.8.0_251)...13:07:44.382[main]ERRORegistry.java:35ServiceDiscoveryIorg.kie.api.KieService: 131-加载失败,因为已经存在一个服务ervi

  • 我正在尝试在Windows Server 2019上使用Zookeeper(3.7.0)集成设置3台Solr(8.4.0)服务器。每个服务器都安装了一个Solr实例和一个Zookeeper。我面临的问题是,我在尝试启动指向多个Zookeeper IP的Solr时出错: .\solr start-c-z“172.29.70.47:2181172.29.70.48:2181” 控制台输出: 无效的命令

  • 我将遵循“Pro Spring Boot”一书中的“扩展Spring Boot应用程序”一章。在本章中,作者首先解释如何创建自己的spring boot starter-。源代码可以在这里找到。我对maven和spring都是新手。 如果我使用与编写器1.3.3相同的Spring启动版本,它可以正常工作。释放。但是它不适用于当前版本的Spring引导,我想让它适用于当前版本。 基本文件夹结构: 当