当前位置: 首页 > 知识库问答 >
问题:

Spring boot应用程序未使用kafka和restcontroller启动

鲁羽
2023-03-14

我有一个运行良好的Spring启动应用程序,直到我在我的应用程序中包含Kafka消费者和生产者。运行完全没有问题的代码是有一个restController,如下所示:

@RestController
public class OrderResource {
    //Get orderheaderkeys for a particular date
    //OrderLine
    @GetMapping("/orderForDate/{forDate}")
    public List<String> findOrderHeaderKeys(@PathVariable String forDate) {
        //Some business logic
        return keys;
    }
}

这个rest终点给出了期望的响应。现在,我包括Kafka制作人和消费者

@Component
public class KafkaProducerClient {
private static Logger logger = LoggerFactory.getLogger(KafkaProducerClient.class);
    private KafkaProducer<String, String> producer;

    @Value("${kafka.bootstrap.servers}")
    private String kafkaBootstrapServers;

    @PostConstruct
    public void init() {
        Properties properties = new Properties();
        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
        properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        producer = new KafkaProducer<String, String>(properties);
    }

    public void sendMessageAsync(String topic, String key, String jsonString) {
        logger.info("Sending message async to kafka topic with key = {}", key);
        long startTime = System.currentTimeMillis();
        ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, jsonString);

        producer.send(record, new Callback() {
            @Override
            public void onCompletion(RecordMetadata recordMetadata, Exception exception) {
                final long timeTaken = System.currentTimeMillis() - startTime;
                if (recordMetadata != null) {
                    logger.info("Producer sent record(key={}, value={}). " +
                                    "Topic={}, Partition={}, Offset={}, timeTaken={}",
                            record.key(), record.value(), topic, recordMetadata.partition(),
                            recordMetadata.offset(), String.valueOf(timeTaken));
                }
                if (exception != null) {
                    logger.error("Exception occurred while posting message", exception.getMessage());
                    return;
                }
            }
        });
        logger.info("Message sent to kafka topic with key = {}", key);
    }

    public void sendMessageSync(String topic, String key, String jsonString) {
        try {
            logger.info("Sending message sync to kafka topic={} with key={}", topic, key);
            long startTime = System.currentTimeMillis();
            ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, key, jsonString);
            Future<RecordMetadata> future = producer.send(record);
            producer.flush();
            RecordMetadata recordMetadata = future.get();
            final long timeTaken = System.currentTimeMillis() - startTime;
            if (recordMetadata != null) {
                logger.info(
                        "Producer sent message by sendMessageSync. record={}. timeTaken={}",
                        recordMetadata,
                        String.valueOf(timeTaken));
            }
        } catch (Exception ex) {
            logger.error("Exception occured....", ex);
        }

    }

    @PreDestroy
    private void shutdown(){
        producer.close();
    }
}
@Component
public class KafkaConsumerClient {
private static Logger logger = LoggerFactory.getLogger(KafkaConsumerClient.class);
    private KafkaConsumer<String, String> consumer;

    @Value("${kafka.bootstrap.servers}")
    private String kafkaBootstrapServers;

    @Value("${kafka.topic}")
    private String topic;

    @Value("${zookeeper.groupId}")
    private String groupId;

    @PostConstruct
    public void init() {
        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        consumer = new KafkaConsumer<String, String>(properties);
        consumer.subscribe(Arrays.asList(topic));

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord record : records) {
                try {
                    logger.info("Key: " + record.key() + ", Value: " + record.value());
                    //orderResource.saveOrderToSecondaryStore(record.value().toString());
                }catch (Exception e){
                    logger.error("Exception while processing Kafka message", e);
                }
            }
        }
    }
}

在包含这些消费者和生产者之后,我的应用程序没有启动。我没有看到之前应用程序运行正常时显示的以下行。

2019-12-12 15:01:12.090信息38376---[重新启动主]o.s.b.w.embedded。公猫TomcatWebServer:Tomcat在端口8080(http)上启动,上下文路径为2019-12-12 15:01:12.093信息38376---[restartedMain]c.w.c.o.p.MySpringApplication:15.187秒内启动MySpringApplication(JVM运行15.617)

共有2个答案

呼延臻
2023-03-14

我通过将消费者轮询(while循环)移到KafkaConsumerClient的init方法之外,解决了这个问题

@PostConstruct
public void init() {
}
赫连捷
2023-03-14

在Tomcat服务器上部署Spring Boot应用程序:更新pom。xml:

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-tomcat</artifactId>
        <scope>provided</scope>
    </dependency>

更新主应用程序类:

@SpringBootApplication
public class Application extends SpringBootServletInitializer {

    @Override
    protected SpringApplicationBuilder configure(SpringApplicationBuilder application) {
        return application.sources(Application.class);
    }

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}

和更新kafka依赖和配置也为更多的细节如下链接-https://www.confluent.io/blog/apache-kafka-spring-boot-application/

 类似资料:
  • 我有一个Spring Boot应用程序,其中有一个Kafka消费者和生产者。还有一个bean来创建主题。 我的Spring Boot应用程序和Kafka都是在Kubernetes的Docker启动的。有时Spring Boot应用程序在Kafka pod启动之前就启动了,因此无法启动,因为用户无法连接(参见stacktrace)。 有没有一种方法可以让我的应用程序以弹性的方式启动?例如,消费者应该

  • 问题内容: 我正在使用Spring Framework 4.0.7,MVC和Rest 我可以在以下方面安心工作: 例如: 用的方法(只是创建) 退还一些东西 工作良好 我可以用 : (我知道它与+ 相同) 例如: 用的方法(只是创建) 退还一些东西 我的问题是: 如果 出于确凿的原因 或 特定情况, 必须强制使用另一种选择 如果(1)无关紧要,则建议采用什么方法以及为什么。 问题答案: 旨在表示整

  • 使用spring-boot时,一切工作都很好。尽管如此,在spring-boot中已删除了注释和。我试图将代码重构为新版本,但我做不到。对于以下测试,我的应用程序在测试之前没有启动,http://localhost:8080返回404: 如何重构测试以使其在Spring-Boot1.5中工作?

  • null 例如: 方法(只创建) 归还某物 null null

  • 它永远不会结束,应用程序也不会响应: 我已经检查了以下几点: 我的应用程序扩展了SpringBootServletInitializer 我将初学者tomcat依赖项放在提供的中 war名为“EdgeCustomerOfferStorageWeb.war”,实例端口为10080,因此我使用:http://server:10080/EdgeCustomerOfferStorageWeb/It不响应,

  • 我正试图将运行在tomcat上的现有应用程序更改为SpringBoot。它一直运行到真正的SpringBoot启动。我有一个类似的应用程序运行在SpringBoot上。这就是我知道它一直运行到Springboot的原因。 我的主要方法: 我尝试使用@componentscan运行main方法,该方法具有如下所示的basePackages: 这无济于事。我尝试在main类的顶部添加@SpringBo