当前位置: 首页 > 知识库问答 >
问题:

Spring boot Kafka类反序列化-不在受信任包中

公西承
2023-03-14
@Bean
    public Map<String, Object> consumerConfigs() {


        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "foo");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.springmiddleware.entities");

        return props;
    }
spring:
kafka:
    bootstrap-servers: localhost:9092
    consumer:
      group-id: foo
      auto-offset-reset: earliest
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      properties:
        spring:
          json:
            trusted:
              packages: 'com.springmiddleware.entities'

并将这些行添加application.properties

spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=com.springmiddleware.entities
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer 
spring.kafka.producer.properties.spring.json.add.type.headers=false

但仍会出现以下错误:

org.apache.kafka.common.errors.SerializationException:反序列化偏移量1处分区topic2-0的键/值时出错。如有需要,请通过记录继续使用。原因:java.lang.IllegalArgumentException:类“com.SpringMiddleware.Entities.Crime”不在受信任的包[java.util,java.lang]中。如果您认为反序列化该类是安全的,请提供它的名称。如果序列化仅由受信任的源完成,则还可以启用trust all(*)。

ReceiverConfig:

@EnableKafka
@Configuration
public class ReceiverConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public Map<String, Object> consumerConfigs() {


        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "foo");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.springmiddleware.entities");
        props.put(JsonDeserializer.USE_TYPE_INFO_HEADERS, "false");
        return props;
    }

    @Bean
    public ConsumerFactory<String, Object> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs(),  new StringDeserializer(),
                new JsonDeserializer<>());
    }

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Object>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

更新2

Listener Class (Receiver): 

    @KafkaListener(topics = "${app.topic.foo}")
@Service
public class Receiver {


     private CountDownLatch latch = new CountDownLatch(1);

        public CountDownLatch getLatch() {
            return latch;
        }

   @KafkaHandler
    public void listen(@Payload Crime message) {

            System.out.println("Received " + message);
    }

   @KafkaHandler
    public void listen(@Payload String message) {

        System.out.println("Received " +  message);
}

共有1个答案

芮承运
2023-03-14

只需使用重载的JSONDeserializer构造函数

从2.2版开始,您可以显式配置反序列化器,以使用提供的目标类型,并通过使用具有布尔值useHeadersIfPresent(默认为true)的重载构造函数之一忽略头中的类型信息。

下面的示例显示如何执行此操作:

DefaultKafkaConsumerFactory<Integer, Cat1> cf = new DefaultKafkaConsumerFactory<>(props,
    new IntegerDeserializer(), new JsonDeserializer<>(Cat1.class, false));
@Bean
public ConsumerFactory<String, Object> consumerFactory() {
    return new DefaultKafkaConsumerFactory<>(consumerConfigs(),  new StringDeserializer(),
            new JsonDeserializer<>(Object.class,false));
}
@KafkaListener(topics = "myTopic")
@Service
public class MultiListenerBean {

@KafkaHandler
public void listen(Cat cat) {
    ...
}

@KafkaHandler
public void listen(Hat hat) {
    ...
}

@KafkaHandler(isDefault = true)
public void delete(Object obj) {
    ...
   }

}
 类似资料:
  • 该消费者不需要受信任的包: 这突然发生了: 我已经尝试了以下方法,但它们不起作用,因为我只得到以下错误: < li> Spring Cloud Stream版本:3.1.2,带有Kafka Streams活页夹。 使用自定义JSON serde解决方法:

  • 我有一门课是这样的: 但是当我试图序列化它时,我收到一个错误,上面写着“试图序列化java.lang.class:java.lang.字符串。忘记注册一个类型适配器了吗?”。所以我创建了这个适配器: } 并登记如下: 但我还是犯了同样的错误<我做错了什么 适配器的实现看起来正常吗?

  • 我读了Doug Stevenson写的这些博客文章,第1部分,第2部分,第3部分。在第二部分,他说 Realtime Database SDK使将DataSnapshot转换为JavaBean样式的对象变得非常容易 并提到我们可以使用这行代码将DataSnapshot反序列化为HotStock对象(一个JavaBean类) HotStock stock=datasnapshot.getValue(

  • 我们正在构建一个通过RabbitMQ接收消息的Spring Boot应用程序(2.0.4版本)。因此,包含与rabbit相关的配置: 配置: 原因:java.lang.IllegalArgumentException:类“My.Fancy.Package.Clazz”不在受信任的包[java.util,java.lang]中。如果您认为反序列化该类是安全的,请提供它的名称。如果序列化仅由受信任的源

  • 我正在使用java spark API编写一些测试应用程序。我使用的是一个不扩展可序列化接口的类。因此,为了使应用程序正常工作,我使用kryo序列化器序列化类。但我在调试时观察到的问题是,在反序列化过程中,返回的类对象变为null,并反过来抛出一个null指针异常。这似乎是关闭的问题,事情是错误的,但不确定。因为我是新的这种系列化,我不知道从哪里开始挖掘。 下面是我正在测试的代码: 下面是我正在序