我有一个运行良好的Spring启动应用程序,直到我在我的应用程序中包含Kafka消费者和生产者。运行完全没有问题的代码是有一个restController,如下所示:
@RestController
public class OrderResource {
//Get orderheaderkeys for a particular date
//OrderLine
@GetMapping("/orderForDate/{forDate}")
public List<String> findOrderHeaderKeys(@PathVariable String forDate) {
//Some business logic
return keys;
}
}
这个rest终点给出了期望的响应。现在,我包括Kafka制作人和消费者
@Component
public class KafkaProducerClient {
private static Logger logger = LoggerFactory.getLogger(KafkaProducerClient.class);
private KafkaProducer<String, String> producer;
@Value("${kafka.bootstrap.servers}")
private String kafkaBootstrapServers;
@PostConstruct
public void init() {
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producer = new KafkaProducer<String, String>(properties);
}
public void sendMessageAsync(String topic, String key, String jsonString) {
logger.info("Sending message async to kafka topic with key = {}", key);
long startTime = System.currentTimeMillis();
ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, jsonString);
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception exception) {
final long timeTaken = System.currentTimeMillis() - startTime;
if (recordMetadata != null) {
logger.info("Producer sent record(key={}, value={}). " +
"Topic={}, Partition={}, Offset={}, timeTaken={}",
record.key(), record.value(), topic, recordMetadata.partition(),
recordMetadata.offset(), String.valueOf(timeTaken));
}
if (exception != null) {
logger.error("Exception occurred while posting message", exception.getMessage());
return;
}
}
});
logger.info("Message sent to kafka topic with key = {}", key);
}
public void sendMessageSync(String topic, String key, String jsonString) {
try {
logger.info("Sending message sync to kafka topic={} with key={}", topic, key);
long startTime = System.currentTimeMillis();
ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, key, jsonString);
Future<RecordMetadata> future = producer.send(record);
producer.flush();
RecordMetadata recordMetadata = future.get();
final long timeTaken = System.currentTimeMillis() - startTime;
if (recordMetadata != null) {
logger.info(
"Producer sent message by sendMessageSync. record={}. timeTaken={}",
recordMetadata,
String.valueOf(timeTaken));
}
} catch (Exception ex) {
logger.error("Exception occured....", ex);
}
}
@PreDestroy
private void shutdown(){
producer.close();
}
}
@Component
public class KafkaConsumerClient {
private static Logger logger = LoggerFactory.getLogger(KafkaConsumerClient.class);
private KafkaConsumer<String, String> consumer;
@Value("${kafka.bootstrap.servers}")
private String kafkaBootstrapServers;
@Value("${kafka.topic}")
private String topic;
@Value("${zookeeper.groupId}")
private String groupId;
@PostConstruct
public void init() {
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
consumer = new KafkaConsumer<String, String>(properties);
consumer.subscribe(Arrays.asList(topic));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord record : records) {
try {
logger.info("Key: " + record.key() + ", Value: " + record.value());
//orderResource.saveOrderToSecondaryStore(record.value().toString());
}catch (Exception e){
logger.error("Exception while processing Kafka message", e);
}
}
}
}
}
在包含这些消费者和生产者之后,我的应用程序没有启动。我没有看到之前应用程序运行正常时显示的以下行。
2019-12-12 15:01:12.090信息38376---[重新启动主]o.s.b.w.embedded。公猫TomcatWebServer:Tomcat在端口8080(http)上启动,上下文路径为2019-12-12 15:01:12.093信息38376---[restartedMain]c.w.c.o.p.MySpringApplication:15.187秒内启动MySpringApplication(JVM运行15.617)
我通过将消费者轮询(while循环)移到KafkaConsumerClient的init方法之外,解决了这个问题
@PostConstruct
public void init() {
}
在Tomcat服务器上部署Spring Boot应用程序:更新pom。xml:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-tomcat</artifactId>
<scope>provided</scope>
</dependency>
更新主应用程序类:
@SpringBootApplication
public class Application extends SpringBootServletInitializer {
@Override
protected SpringApplicationBuilder configure(SpringApplicationBuilder application) {
return application.sources(Application.class);
}
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
和更新kafka依赖和配置也为更多的细节如下链接-https://www.confluent.io/blog/apache-kafka-spring-boot-application/
我有一个Spring Boot应用程序,其中有一个Kafka消费者和生产者。还有一个bean来创建主题。 我的Spring Boot应用程序和Kafka都是在Kubernetes的Docker启动的。有时Spring Boot应用程序在Kafka pod启动之前就启动了,因此无法启动,因为用户无法连接(参见stacktrace)。 有没有一种方法可以让我的应用程序以弹性的方式启动?例如,消费者应该
问题内容: 我正在使用Spring Framework 4.0.7,MVC和Rest 我可以在以下方面安心工作: 例如: 用的方法(只是创建) 退还一些东西 工作良好 我可以用 : (我知道它与+ 相同) 例如: 用的方法(只是创建) 退还一些东西 我的问题是: 如果 出于确凿的原因 或 特定情况, 必须强制使用另一种选择 如果(1)无关紧要,则建议采用什么方法以及为什么。 问题答案: 旨在表示整
使用spring-boot时,一切工作都很好。尽管如此,在spring-boot中已删除了注释和。我试图将代码重构为新版本,但我做不到。对于以下测试,我的应用程序在测试之前没有启动,http://localhost:8080返回404: 如何重构测试以使其在Spring-Boot1.5中工作?
null 例如: 方法(只创建) 归还某物 null null
它永远不会结束,应用程序也不会响应: 我已经检查了以下几点: 我的应用程序扩展了SpringBootServletInitializer 我将初学者tomcat依赖项放在提供的中 war名为“EdgeCustomerOfferStorageWeb.war”,实例端口为10080,因此我使用:http://server:10080/EdgeCustomerOfferStorageWeb/It不响应,
我正试图将运行在tomcat上的现有应用程序更改为SpringBoot。它一直运行到真正的SpringBoot启动。我有一个类似的应用程序运行在SpringBoot上。这就是我知道它一直运行到Springboot的原因。 我的主要方法: 我尝试使用@componentscan运行main方法,该方法具有如下所示的basePackages: 这无济于事。我尝试在main类的顶部添加@SpringBo