有一个带有kafka依赖的Spring启动应用程序,有两个kafka主题,需要从它们中读取消息
tacocloud.orders.topic
tacocloud.tacos.topic
并且已经成功地在里面发送了消息
配置kafka配置听这个主题像这样
package com.example.tacocloud.config;
import com.example.tacocloud.model.jpa.Order;
import com.example.tacocloud.model.jpa.Taco;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.KafkaMessageListenerContainer;
import org.springframework.kafka.listener.MessageListener;
import java.util.HashMap;
import java.util.Map;
@Slf4j
@Configuration
@EnableKafka
@EnableConfigurationProperties
public class KafkaConfig {
// Order
@Bean
@ConfigurationProperties("spring.kafka.consumer.order")
public Map<String, Object> orderConsumerConfig() {
return new HashMap<>();
}
@Bean
public DefaultKafkaConsumerFactory<Object, Order> orderConsumerFactory(@Qualifier("orderConsumerConfig")
Map<String, Object> orderConsumerConfig) {
return new DefaultKafkaConsumerFactory<>(orderConsumerConfig);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Order> order1KafkaMessageListenerContainer(
@Qualifier("orderConsumerConfig") Map<String, Object> orderConsumerConfig,
@Qualifier("orderConsumerFactory") DefaultKafkaConsumerFactory orderConsumerFactory) {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(orderConsumerFactory);
return factory;
}
// Taco
@Bean
@ConfigurationProperties("spring.kafka.consumer.taco")
public Map<String, Object> tacoConsumerConfig() {
return new HashMap<>();
}
@Bean
public DefaultKafkaConsumerFactory tacoConsumerFactory(
@Qualifier("tacoConsumerConfig") Map<String, Object> tacoConsumerConfig) {
return new DefaultKafkaConsumerFactory<>(tacoConsumerConfig);
}
@Bean
public ConcurrentKafkaListenerContainerFactory tacoConcurrentMessageListenerContainer(
@Qualifier("tacoConsumerConfig") Map<String, Object> tacoConsumerConfig,
@Qualifier("tacoConsumerFactory") DefaultKafkaConsumerFactory tacoConsumerFactory) {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(tacoConsumerFactory);
return factory;
}
}
因此,有两个DefaultKafkaClient erFactory和两个ConAutomtKafkaListenerContainerFactory Aften,创建了一个带有@KafkaListener日志消息的服务:
package com.example.tacocloud.service;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
@EnableKafka
public class KafkaService {
@KafkaListener(topics = "tacocloud.orders.topic", groupId = "one")
public void order() {
System.out.println("Order");
}
@KafkaListener(topics ="tacocloud.tacos.topic", groupId = "two")
public void taco() {
System.out.println("Taco");
}
}
应用yml文件
spring:
kafka:
consumer:
order:
topic: tacocloud.orders.topic
"[bootstrap.servers]": localhost:29888
"[key.deserializer]": org.apache.kafka.common.serialization.StringDeserializer
"[value.deserializer]": com.example.tacocloud.model.serialization.OrderDeserializer
template:
"[default.topic]": tacocloud.orders.topic
taco:
topic: tacocloud.tacos.topic
"[bootstrap.servers]": localhost:29888
"[key.deserializer]": org.apache.kafka.common.serialization.StringDeserializer
"[value.deserializer]": com.example.tacocloud.model.serialization.TacoDeserializer
template:
"[default.topic]": tacocloud.tacos.topic
但是,当我启动我的Spring启动应用程序时,我看到错误消息(((
2022-04-15 21:38:35.918错误13888---[restartedMain]开机。SpringApplication:应用程序运行失败
进程已完成,退出代码为0
谢谢你的样品。
所以,我在本地打开它,并在这个bean定义中放置了一个断点:
@Bean
public DefaultKafkaConsumerFactory<Object, Order> orderConsumerFactory(@Qualifier("orderConsumerConfig")
Map<String, Object> orderConsumerConfig) {
return new DefaultKafkaConsumerFactory<Object, Order>(orderConsumerConfig);
}
该orderConsumerConfig
地图如下所示:
orderConsumerConfig = {LinkedHashMap@8587} size = 1
"orderConsumerConfig" -> {HashMap@8600} size = 5
key = "orderConsumerConfig"
value = {HashMap@8600} size = 5
"key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer"
"template" -> {LinkedHashMap@8611} size = 1
"topic" -> "tacocloud.orders.topic"
"bootstrap.servers" -> "localhost:29888"
"value.deserializer" -> "sample.kafka.serializer.OrderDeserializer"
因此,您的KafkaConsumer
没有正确初始化,这并不奇怪。您的目标地图配置隐藏在该地图的orderConsumerConfig
条目下。
你介意和我分享一下你是从哪里了解到映射
bean上的@ConfigurationProperties
的吗?那该如何作为依赖注入呢?
我正在使用Spring Boot实现rest服务。实体类在一个单独的包中定义。所以我在A中添加了组件注释pplication.java. 这是我的控制器类: 我还创建了一个JPA数据存储库,扩展了JPARepository,并在其中添加了自定义查询代码。 下面是实现服务接口的servicebean类 当我启动应用程序并在浏览器中键入以下url时“http://localhost:8080/api/
笔记: 必须安装Java。获取JRE(http://www.java.com)或者JDK 从环境变量JAVA_HOME(C:\Program Files\Java\jdk1.8.0_251)...13:07:44.382[main]ERRORegistry.java:35ServiceDiscoveryIorg.kie.api.KieService: 131-加载失败,因为已经存在一个服务ervi
我正在尝试在Windows Server 2019上使用Zookeeper(3.7.0)集成设置3台Solr(8.4.0)服务器。每个服务器都安装了一个Solr实例和一个Zookeeper。我面临的问题是,我在尝试启动指向多个Zookeeper IP的Solr时出错: .\solr start-c-z“172.29.70.47:2181172.29.70.48:2181” 控制台输出: 无效的命令
我将遵循“Pro Spring Boot”一书中的“扩展Spring Boot应用程序”一章。在本章中,作者首先解释如何创建自己的spring boot starter-。源代码可以在这里找到。我对maven和spring都是新手。 如果我使用与编写器1.3.3相同的Spring启动版本,它可以正常工作。释放。但是它不适用于当前版本的Spring引导,我想让它适用于当前版本。 基本文件夹结构: 当