当前位置: 首页 > 知识库问答 >
问题:

Spring boot Kafka在不同的微服务之间发送对象

曹泉
2023-03-14

这是我的producer microservice kafka配置:

@Configuration
public class KafkaProducerConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapService;

    //producer factory
    @Bean
    public ProducerFactory<String, Object> producerFactory (){
        Map<String, Object> configMap = new HashMap<>();
        configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapService);
        configMap.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configMap.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

        return new DefaultKafkaProducerFactory<>(configMap);
    }

    //inviare messaggi
    @Bean
    public KafkaTemplate<String, Object> kafkaTemplate(){
        return new KafkaTemplate<>(producerFactory());
    }
}

我发送的信息如下:

kafkaTemplate.send(TOPIC_NAME, message);

我和制片人没有任何问题,

这是消费者微服务Kafka配置:

@Configuration
public class KafkaConsumerConfig {
    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    
    @Bean
    public ConsumerFactory<String, Object> consumerFactory() {
        JsonDeserializer<Object> jsonDeserializer = new JsonDeserializer<>();
        jsonDeserializer.addTrustedPackages("*");
        return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(), jsonDeserializer);
    }

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Object>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());

        return factory;
    }

    @Bean
    public Map<String, Object> consumerConfigs() {
        return Map.of(
                ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers,
                ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class,
                ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class,
                ConsumerConfig.GROUP_ID_CONFIG, "testId"
        );
    }
}

使用相同的配置,但使用Object的字符串istead,或使用自定义对象替代,我得到以下stacktrace:

    java.lang.IllegalStateException: This error handler cannot process 'SerializationException's directly; please consider configuring an 'ErrorHandlingDeserializer' in the value and/or key deserializer
    at org.springframework.kafka.listener.DefaultErrorHandler.handleOtherException(DefaultErrorHandler.java:149) ~[spring-kafka-2.8.3.jar:2.8.3]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1763) ~[spring-kafka-2.8.3.jar:2.8.3]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1286) ~[spring-kafka-2.8.3.jar:2.8.3]
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
    at java.base/java.lang.Thread.run(Thread.java:829) ~[na:na]
Caused by: org.apache.kafka.common.errors.RecordDeserializationException: Error deserializing key/value for partition kafka-topic-2-0 at offset 25. If needed, please seek past the record to continue consumption.
    at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1429) ~[kafka-clients-3.0.0.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher.access$3400(Fetcher.java:134) ~[kafka-clients-3.0.0.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1652) ~[kafka-clients-3.0.0.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1800(Fetcher.java:1488) ~[kafka-clients-3.0.0.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:721) ~[kafka-clients-3.0.0.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:672) ~[kafka-clients-3.0.0.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1277) ~[kafka-clients-3.0.0.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1238) ~[kafka-clients-3.0.0.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211) ~[kafka-clients-3.0.0.jar:na]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollConsumer(KafkaMessageListenerContainer.java:1510) ~[spring-kafka-2.8.3.jar:2.8.3]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1500) ~[spring-kafka-2.8.3.jar:2.8.3]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1328) ~[spring-kafka-2.8.3.jar:2.8.3]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1237) ~[spring-kafka-2.8.3.jar:2.8.3]
    ... 3 common frames omitted
Caused by: org.springframework.messaging.converter.MessageConversionException: failed to resolve class name. Class not found [com.example.kafkaproducer.model.Message]; nested exception is java.lang.ClassNotFoundException: com.example.kafkaproducer.model.Message
    at org.springframework.kafka.support.mapping.DefaultJackson2JavaTypeMapper.getClassIdType(DefaultJackson2JavaTypeMapper.java:142) ~[spring-kafka-2.8.3.jar:2.8.3]
    at org.springframework.kafka.support.mapping.DefaultJackson2JavaTypeMapper.toJavaType(DefaultJackson2JavaTypeMapper.java:103) ~[spring-kafka-2.8.3.jar:2.8.3]
    at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:569) ~[spring-kafka-2.8.3.jar:2.8.3]
    at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1420) ~[kafka-clients-3.0.0.jar:na]
    ... 15 common frames omitted
Caused by: java.lang.ClassNotFoundException: com.example.kafkaproducer.model.Message
    at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581) ~[na:na]
    at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178) ~[na:na]
    at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522) ~[na:na]
    at java.base/java.lang.Class.forName0(Native Method) ~[na:na]
    at java.base/java.lang.Class.forName(Class.java:398) ~[na:na]
    at org.springframework.boot.devtools.restart.classloader.RestartClassLoader.loadClass(RestartClassLoader.java:145) ~[spring-boot-devtools-2.6.4.jar:2.6.4]
    at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522) ~[na:na]
    at java.base/java.lang.Class.forName0(Native Method) ~[na:na]
    at java.base/java.lang.Class.forName(Class.java:398) ~[na:na]
    at org.springframework.util.ClassUtils.forName(ClassUtils.java:284) ~[spring-core-5.3.16.jar:5.3.16]
    at org.springframework.kafka.support.mapping.DefaultJackson2JavaTypeMapper.getClassIdType(DefaultJackson2JavaTypeMapper.java:138) ~[spring-kafka-2.8.3.jar:2.8.3]
    ... 18 common frames omitted

对于让生产者和消费者参与不同的服务,你有什么建议吗?

[编辑]我正在添加侦听器,因为有人问:

@KafkaListener(topics = TOPIC_NAME, groupId = "testId")
public void listener(@Payload Message rcvMessage){

    log.info("message: {}", rcvMessage);

}

请注意,在两个项目中都使用相同的参数定义了消息类。

[编辑2]它现在可以工作了,我删除了反序列化器中的标头,如下所示:

@Bean
public ConsumerFactory<String, Message> consumerFactory() {
    JsonDeserializer<Message> jsonDeserializer = new JsonDeserializer<>(Message.class, false);
    jsonDeserializer.addTrustedPackages("*");
    return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(), jsonDeserializer);
}

这是可以接受的还是只是一种变通方法?

共有1个答案

乐正玺
2023-03-14

当您向kafka发送消息时,会随消息一起发送一个标头。默认情况下,标头内是自定义对象的完整地址。

例如:

 com.example.kafkaproducer.model.Message

消息类的消费者端必须在生产者的同一路径中创建。因为在消费者端接收消息时进行验证,如果指定了路径以外的路径,则会收到以下错误。

原因:java.lang.ClassNotFoundException:com.example.kafkaproducer.model.消息

很可能,如果您在消费者端更改Message类路径,您的错误将得到修复。

 类似资料:
  • 我有两个微服务,例如A和B。微服务B有剩余的enpoint,必须只能从微服务A访问。如何限制微服务之间的访问?如果可能的话,最佳做法是什么? 我正在使用Spring Cloud Security(oAuth2, jwt)。

  • 我将非常感谢任何答复以及文章,在您的意见可能有助于这里。提前谢谢你。

  • 为了这个问题,我正在做一个项目,其中有两个微服务: null 根据我所读到的,为了进行这种更改,我们需要做以下几点: 从->切换 更改配置以启用分布式命令 使用SpringCloud或JCloud连接微服务 将AxonFramework添加到遗留InvoiceService项目并处理接收到的saga事件。 这是我们遇到麻烦的第四点:发票服务是由一个不愿意进行更改的单独团队维护的。 在这种情况下,使

  • 我有个计时器工作 现在,在大多数情况下,这会给我正确的结果。但如果有两个微服务实例同时执行这段代码,会发生什么呢?

  • 我正在写一个微服务的应用程序,由Spring Boot和Spring云。我有五个模块 API网关(基于Spring云网关spect) Discovery-Server(基于Spring云NetflixEureka服务发现) Microservice-A(它是一个包含我们业务的Spring引导应用程序) Microservice-B(它是一个包含我们业务的Spring引导应用程序) Microser

  • 当前体系结构: 问题: 我们在前端和后端层之间有一个两步流程。 null 微服务2(MS2)需要验证I1的完整性,因为它来自前端。如何避免对MS1进行新的查询?最好的办法是什么? 我试图优化的流删除了步骤1.3和2.3 流程1: null 流程2: 2.1用户X已在本地/会话存储中存储了数据(MS2_Data) 2.2用户X在MS1上保留数据(MS2_Data+MS1_Data) 2.3 MS1使