当前位置: 首页 > 知识库问答 >
问题:

Spring Kafka JsonDesirialization MessageConversionException无法解析类名

沈开畅
2023-03-14

我有两个服务应该通过Kafka进行通信。让我们调用第一个服务WriteService和第二个服务QueryService。

在WriteService方面,我为生产者提供了以下配置。

@Configuration
public class KafkaProducerConfiguration {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        // list of host:port pairs used for establishing the initial connections to the Kakfa cluster
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                JsonSerializer.class);

        return props;
    }

    @Bean
    public ProducerFactory<String, Object> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<String, Object> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}
@Configuration
@EnableKafka
public class KafkaConsumerConfiguration {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${spring.kafka.groupid}")
    private String serviceGroupId;

    @Value("${spring.kafka.consumer.trusted-packages}")
    private String trustedPackage;

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                bootstrapServers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                JsonDeserializer.class);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, serviceGroupId);
        props.put(JsonDeserializer.TRUSTED_PACKAGES, trustedPackage);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        return props;
    }

    @Bean
    public ConsumerFactory<String, Object> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Object> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());

        return factory;
    }

}

侦听器有以下定义。有效负载类具有完全限定名-com.example.project.clientQueryView.module.routes.messaging.kafka.routedto

@KafkaListener(topics = "${spring.kafka.topics.routes}",
            containerFactory = "kafkaListenerContainerFactory")
    public void listenForRoute(ConsumerRecord<String, RouteDto> cr,
                               @Payload RouteDto payload) {
        logger.info("Logger 1 [JSON] received key {}: Type [{}] | Payload: {} | Record: {}", cr.key(),
                typeIdHeader(cr.headers()), payload, cr.toString());
    }

    private static String typeIdHeader(Headers headers) {
        return StreamSupport.stream(headers.spliterator(), false)
                .filter(header -> header.key().equals("__TypeId__"))
                .findFirst().map(header -> new String(header.value())).orElse("N/A");
    }

当发送消息时,我得到以下错误

由:org.springframework.messaging.converter.MessageConversionException引起:无法解析类名。类未找到[com.example.project.web.routes.dto.routedto];嵌套异常是java.lang.ClassNotFoundException:com.example.project.web.routes.dto.routedTo

如何在不手动配置映射器的情况下解决这个问题?

共有1个答案

吴康平
2023-03-14

如果您使用的是Spring-Kafka-2.2.x,则可以通过JSONDeserializer文档的重载构造函数禁用默认头

从版本2.2开始,您可以显式地配置反序列化器,使用提供的目标类型,并通过使用一个具有布尔useHeadersIfPresent(默认为true)的重载构造函数忽略标头中的类型信息。下面的示例演示如何这样做:

DefaultKafkaConsumerFactory<String, Object> cf = new DefaultKafkaConsumerFactory<>(props,
    new IntegerDeserializer(), new JsonDeserializer<>(Cat1.class, false));

如果使用较低版本,则使用MessageConverter(您可能会在Spring-Kafka-2.1.x及更高版本中看到此问题)

@Bean
 public KafkaListenerContainerFactory<?> kafkaJsonListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, RouteDto> factory =
    new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setMessageConverter(new StringJsonMessageConverter());
return factory;
  }

  @KafkaListener(topics = "jsonData",
            containerFactory = "kafkaJsonListenerContainerFactory")
    public void jsonListener(RouteDto dto) {
     ...
   }

注意:只有在方法级别声明@Kafkalistener注释时,才能实现这种类型推断。对于类级别的@KafKalistener,有效负载类型用于选择调用哪个@KafKaHandler方法,因此在选择该方法之前必须已经转换过它。

 类似资料:
  • 当我通过命令行或Jenkins运行Groovy脚本时,会出现“无法解析类”错误。 我在C中的同一文件夹中有以下两个groovy文件:\Users\myuser\git\productname\mycompany build\src\main\groovy\com\mycompany\build 富。groovy公司 酒吧groovy公司 我经营Foo。groovy使用命令行。 运行Groovy时,

  • 问题内容: 这是我得到的错误: 这是我的代码: 另外,我在一个单独的类中声明了这一点: 是什么导致此错误,我该如何解决? 编辑:缩进,空格,命名约定和可读性问题已得到解决。 问题答案: 问题在于,这似乎是一个内部类(很难说,您的缩进是不好的),并且由于它不是静态的,因此您无法单独实例化它。 您要么需要自己创建一个类,要么使其成为一个类并将其实例化为(或者无论父类是什么,您的缩进在这里实际上都没有帮

  • 问题内容: 尝试遵循本教程:教程 尝试像这样使用 JSONParser : 但是Eclipse给了我。 该怎么办? 问题答案: JSONParser类在本教程的下方,它看起来像这样…… 我会考虑看看杰克逊图书馆通过http://jackson.codehaus.org/

  • 问题内容: 我从git下载了Android的Browser项目并将其导入到Eclipse。但是由于以下错误,我无法构建它: 无法解析类型java.lang.Enum。从所需的.class文件间接引用它 现在…这是一个开发环境,对于其他项目来说效果很好: Windows 7 Ultimate 64位。 JDK 64位(已安装jdk-6u23-windows-x64.exe。) Eclipse Cla

  • 我刚开始使用Android Studio一周,它对我来说非常好,但当我今天开始使用Android Studio时,我得到了一个错误:“错误:重复类:mypackage”。R’。我以前在使用Eclipse时看到过这个错误,所以我尝试过几次重建项目并重新启动Android Studio,但都没有帮助。 在阅读了一些Stackoverflow问题后,我尝试删除R.java并再次重建,现在我在重建时没有收

  • 我正在尝试将主题添加到我的项目中。它需要appcompat v7。 我按照Android开发者网站中提到的步骤添加了支持库。经过大量搜索,我知道支持库v4将与支持库v7冲突。 所以我从我的项目中删除了android库v4。因此,现在我得到了t错误()。 我必须做什么?请有人帮我解决这个问题。预先感谢

  • 在我的testrunner类中,当使用时,我收到一条错误消息,说“cucumberoptions不能解析为类型”。我使用的是最新版本的eclipse cucumber依赖项。通过安装和卸载eclipse,通过降低cucumber依赖项版本,我已经尝试了所有可能的方法。 我的跑者班是:

  • 我创建了一个RunTest类来使用Cucumber和JUnit运行测试场景。要运行之前的测试,我需要将RunWith类(@RunWith)导入到我的项目中,并将Cucumber.class作为参数传递。然后,类的RunWith参数识别更多传递给它的参数,不。eclipse将显示以下消息: 无法将此行类中的多个标记解析为类型。-Cucumber无法解析为类型。-注释@runwith必须定义属性值 我