这是我的producer microservice kafka配置:
@Configuration
public class KafkaProducerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapService;
//producer factory
@Bean
public ProducerFactory<String, Object> producerFactory (){
Map<String, Object> configMap = new HashMap<>();
configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapService);
configMap.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configMap.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(configMap);
}
//inviare messaggi
@Bean
public KafkaTemplate<String, Object> kafkaTemplate(){
return new KafkaTemplate<>(producerFactory());
}
}
我发送的信息如下:
kafkaTemplate.send(TOPIC_NAME, message);
我和制片人没有任何问题,
这是消费者微服务Kafka配置:
@Configuration
public class KafkaConsumerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public ConsumerFactory<String, Object> consumerFactory() {
JsonDeserializer<Object> jsonDeserializer = new JsonDeserializer<>();
jsonDeserializer.addTrustedPackages("*");
return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(), jsonDeserializer);
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Object>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public Map<String, Object> consumerConfigs() {
return Map.of(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers,
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class,
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class,
ConsumerConfig.GROUP_ID_CONFIG, "testId"
);
}
}
使用相同的配置,但使用Object的字符串istead,或使用自定义对象替代,我得到以下stacktrace:
java.lang.IllegalStateException: This error handler cannot process 'SerializationException's directly; please consider configuring an 'ErrorHandlingDeserializer' in the value and/or key deserializer
at org.springframework.kafka.listener.DefaultErrorHandler.handleOtherException(DefaultErrorHandler.java:149) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1763) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1286) ~[spring-kafka-2.8.3.jar:2.8.3]
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:829) ~[na:na]
Caused by: org.apache.kafka.common.errors.RecordDeserializationException: Error deserializing key/value for partition kafka-topic-2-0 at offset 25. If needed, please seek past the record to continue consumption.
at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1429) ~[kafka-clients-3.0.0.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher.access$3400(Fetcher.java:134) ~[kafka-clients-3.0.0.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1652) ~[kafka-clients-3.0.0.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1800(Fetcher.java:1488) ~[kafka-clients-3.0.0.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:721) ~[kafka-clients-3.0.0.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:672) ~[kafka-clients-3.0.0.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1277) ~[kafka-clients-3.0.0.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1238) ~[kafka-clients-3.0.0.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211) ~[kafka-clients-3.0.0.jar:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollConsumer(KafkaMessageListenerContainer.java:1510) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1500) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1328) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1237) ~[spring-kafka-2.8.3.jar:2.8.3]
... 3 common frames omitted
Caused by: org.springframework.messaging.converter.MessageConversionException: failed to resolve class name. Class not found [com.example.kafkaproducer.model.Message]; nested exception is java.lang.ClassNotFoundException: com.example.kafkaproducer.model.Message
at org.springframework.kafka.support.mapping.DefaultJackson2JavaTypeMapper.getClassIdType(DefaultJackson2JavaTypeMapper.java:142) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.support.mapping.DefaultJackson2JavaTypeMapper.toJavaType(DefaultJackson2JavaTypeMapper.java:103) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:569) ~[spring-kafka-2.8.3.jar:2.8.3]
at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1420) ~[kafka-clients-3.0.0.jar:na]
... 15 common frames omitted
Caused by: java.lang.ClassNotFoundException: com.example.kafkaproducer.model.Message
at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581) ~[na:na]
at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178) ~[na:na]
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522) ~[na:na]
at java.base/java.lang.Class.forName0(Native Method) ~[na:na]
at java.base/java.lang.Class.forName(Class.java:398) ~[na:na]
at org.springframework.boot.devtools.restart.classloader.RestartClassLoader.loadClass(RestartClassLoader.java:145) ~[spring-boot-devtools-2.6.4.jar:2.6.4]
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522) ~[na:na]
at java.base/java.lang.Class.forName0(Native Method) ~[na:na]
at java.base/java.lang.Class.forName(Class.java:398) ~[na:na]
at org.springframework.util.ClassUtils.forName(ClassUtils.java:284) ~[spring-core-5.3.16.jar:5.3.16]
at org.springframework.kafka.support.mapping.DefaultJackson2JavaTypeMapper.getClassIdType(DefaultJackson2JavaTypeMapper.java:138) ~[spring-kafka-2.8.3.jar:2.8.3]
... 18 common frames omitted
对于让生产者和消费者参与不同的服务,你有什么建议吗?
[编辑]我正在添加侦听器,因为有人问:
@KafkaListener(topics = TOPIC_NAME, groupId = "testId")
public void listener(@Payload Message rcvMessage){
log.info("message: {}", rcvMessage);
}
请注意,在两个项目中都使用相同的参数定义了消息类。
[编辑2]它现在可以工作了,我删除了反序列化器中的标头,如下所示:
@Bean
public ConsumerFactory<String, Message> consumerFactory() {
JsonDeserializer<Message> jsonDeserializer = new JsonDeserializer<>(Message.class, false);
jsonDeserializer.addTrustedPackages("*");
return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(), jsonDeserializer);
}
这是可以接受的还是只是一种变通方法?
当您向kafka发送消息时,会随消息一起发送一个标头。默认情况下,标头内是自定义对象的完整地址。
例如:
com.example.kafkaproducer.model.Message
消息类的消费者端必须在生产者的同一路径中创建。因为在消费者端接收消息时进行验证,如果指定了路径以外的路径,则会收到以下错误。
原因:java.lang.ClassNotFoundException:com.example.kafkaproducer.model.消息
很可能,如果您在消费者端更改Message类路径,您的错误将得到修复。
我有两个微服务,例如A和B。微服务B有剩余的enpoint,必须只能从微服务A访问。如何限制微服务之间的访问?如果可能的话,最佳做法是什么? 我正在使用Spring Cloud Security(oAuth2, jwt)。
我将非常感谢任何答复以及文章,在您的意见可能有助于这里。提前谢谢你。
为了这个问题,我正在做一个项目,其中有两个微服务: null 根据我所读到的,为了进行这种更改,我们需要做以下几点: 从->切换 更改配置以启用分布式命令 使用SpringCloud或JCloud连接微服务 将AxonFramework添加到遗留InvoiceService项目并处理接收到的saga事件。 这是我们遇到麻烦的第四点:发票服务是由一个不愿意进行更改的单独团队维护的。 在这种情况下,使
我有个计时器工作 现在,在大多数情况下,这会给我正确的结果。但如果有两个微服务实例同时执行这段代码,会发生什么呢?
我正在写一个微服务的应用程序,由Spring Boot和Spring云。我有五个模块 API网关(基于Spring云网关spect) Discovery-Server(基于Spring云NetflixEureka服务发现) Microservice-A(它是一个包含我们业务的Spring引导应用程序) Microservice-B(它是一个包含我们业务的Spring引导应用程序) Microser
当前体系结构: 问题: 我们在前端和后端层之间有一个两步流程。 null 微服务2(MS2)需要验证I1的完整性,因为它来自前端。如何避免对MS1进行新的查询?最好的办法是什么? 我试图优化的流删除了步骤1.3和2.3 流程1: null 流程2: 2.1用户X已在本地/会话存储中存储了数据(MS2_Data) 2.2用户X在MS1上保留数据(MS2_Data+MS1_Data) 2.3 MS1使