当前位置: 首页 > 知识库问答 >
问题:

Spring Kafka键序列化程序不用于对象

徐嘉谊
2023-03-14

ControlChannelSchedulerEntry是常规POJO。

环境是:

  • Java 11
  • Spring Boot 2.4.1
  • Kafka2.6.0

    @KafkaListener(topics = "Scheduler", groupId = "scheduler", containerFactory = "schedulerKafkaListenerContainerFactory")
    public void listenForScheduler(
        @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) ControlChannel control, 
        @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long timestamp,
        @Payload SchedulerEntry entry) {

        log.info("received data KEY ='{}'", control);
        log.info("received data PAYLOAD = '{}'", entry);

        /* ... */

    }

    @Bean
    public KafkaTemplate<ControlChannel, SchedulerEntry> schedulerKafkaTemplate() {
        return new KafkaTemplate<>(schedulerProducerFactory());
    }


    @Bean
    public ProducerFactory<ControlChannel, SchedulerEntry> schedulerProducerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
        props.put(JsonSerializer.TYPE_MAPPINGS, "key:io.infolayer.aida.ControlChannel, value:io.infolayer.aida.entity.SchedulerEntry");
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return new DefaultKafkaProducerFactory<>(props, 
            new JsonSerializer<ControlChannel>(),
            new JsonSerializer<SchedulerEntry>());
    }
   
    
    public ConsumerFactory<ControlChannel, SchedulerEntry> consumerFactory(String groupId) {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        props.put(JsonDeserializer.REMOVE_TYPE_INFO_HEADERS, false);
        props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
        props.put(JsonDeserializer.TYPE_MAPPINGS, "key:io.infolayer.aida.ControlChannel, value:io.infolayer.aida.entity.SchedulerEntry");
    
        JsonDeserializer<ControlChannel> k = new JsonDeserializer<ControlChannel>();
        k.configure(props, true);
    
        JsonDeserializer<SchedulerEntry> v = new JsonDeserializer<SchedulerEntry>();
        k.configure(props, true);
    
        return new DefaultKafkaConsumerFactory<>(props, k, v);
    }
    
    @Bean
    public ConcurrentKafkaListenerContainerFactory<ControlChannel, SchedulerEntry> schedulerKafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<ControlChannel, SchedulerEntry> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory("scheduler"));
        return factory;
    }

Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition Scheduler-0 at offset 25. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalStateException: No type information in headers and no default type provided

**第二次尝试-消费者和生产者(只是将键序列化/反序列化器设置为Json)**

@Bean
    public ProducerFactory<ControlChannel, SchedulerEntry> schedulerProducerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return new DefaultKafkaProducerFactory<>(props);
    }

    public ConsumerFactory<ControlChannel, SchedulerEntry> consumerFactory(String groupId) {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props, new JsonDeserializer<>(ControlChannel.class), new JsonDeserializer<>(SchedulerEntry.class));
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<ControlChannel, SchedulerEntry> schedulerKafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<ControlChannel, SchedulerEntry> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory("scheduler"));
        return factory;
    }


例外

org.springframework.kafka.KafkaException: Seek to current after exception; nested exception is org.springframework.kafka.listener.ListenerExecutionFailedException: 
Listener method 'public void  io.infolayer.aida.scheduler.KafkaSchedulerListener.listenForScheduler(io.infolayer.aida.ControlChannel,long,io.infolayer.aida.entity.SchedulerEntry)' 
threw exception; nested exception is org.springframework.core.convert.ConverterNotFoundException: 
No converter found capable of converting from type [io.infolayer.aida.entity.SchedulerEntry] to type [@org.springframework.messaging.handler.annotation.Header io.infolayer.aida.ControlChannel]; nested exception is org.springframework.core.convert.ConverterNotFoundException: 
No converter found capable of converting from type [io.infolayer.aida.entity.SchedulerEntry] to type [@org.springframework.messaging.handler.annotation.Header io.infolayer.aida.ControlChannel]

共有1个答案

华永逸
2023-03-14

你的第一次尝试有几个问题。

  • 需要调用序列化程序的configue(),添加类型为info=true
  • 您对k调用了两次configure(),但没有配置v(反序列化程序)

这是预期的...

@SpringBootApplication
public class So65501295Application {

    private static final Logger log = LoggerFactory.getLogger(So65501295Application.class);

    public static void main(String[] args) {
        SpringApplication.run(So65501295Application.class, args);
    }

    @Bean
    public ProducerFactory<ControlChannel, SchedulerEntry> schedulerProducerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, true);
        props.put(JsonSerializer.TYPE_MAPPINGS,
                "key:com.example.demo.So65501295Application.ControlChannel, "
                        + "value:com.example.demo.So65501295Application.SchedulerEntry");
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        JsonSerializer<ControlChannel> k = new JsonSerializer<ControlChannel>();
        k.configure(props, true);
        JsonSerializer<SchedulerEntry> v = new JsonSerializer<SchedulerEntry>();
        v.configure(props, false);
        return new DefaultKafkaProducerFactory<>(props, k, v);
    }

    public ConsumerFactory<ControlChannel, SchedulerEntry> consumerFactory(String groupId) {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(JsonDeserializer.REMOVE_TYPE_INFO_HEADERS, false);
        props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
        props.put(JsonDeserializer.TYPE_MAPPINGS,
                "key:com.example.demo.So65501295Application.ControlChannel, "
                        + "value:com.example.demo.So65501295Application.SchedulerEntry");

        JsonDeserializer<ControlChannel> k = new JsonDeserializer<ControlChannel>();
        k.configure(props, true);

        JsonDeserializer<SchedulerEntry> v = new JsonDeserializer<SchedulerEntry>();
        v.configure(props, false);

        return new DefaultKafkaConsumerFactory<>(props, k, v);
    }

    @KafkaListener(topics = "Scheduler", groupId = "scheduler", containerFactory = "schedulerKafkaListenerContainerFactory")
    public void listenForScheduler(
            @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) ControlChannel control,
            @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long timestamp,
            @Payload SchedulerEntry entry) {

        log.info("received data KEY ='{}'", control);
        log.info("received data PAYLOAD = '{}'", entry);

        /* ... */

    }

    @Bean
    public KafkaTemplate<ControlChannel, SchedulerEntry> schedulerKafkaTemplate() {
        return new KafkaTemplate<>(schedulerProducerFactory());
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<ControlChannel, SchedulerEntry> schedulerKafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<ControlChannel, SchedulerEntry> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory("scheduler"));
        return factory;
    }


    @Bean
    public ApplicationRunner runner(KafkaTemplate<ControlChannel, SchedulerEntry> template) {
        return args -> {
            template.send("Scheduler", new ControlChannel(), new SchedulerEntry());
        };
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("Scheduler").partitions(1).replicas(1).build();
    }

    public static class ControlChannel {

        String foo;

        public String getFoo() {
            return this.foo;
        }

        public void setFoo(String foo) {
            this.foo = foo;
        }

    }

    public static class SchedulerEntry {

        String foo;

        public String getFoo() {
            return this.foo;
        }

        public void setFoo(String foo) {
            this.foo = foo;
        }

    }

}
2021-01-04 11:42:25.026  INFO 23905 --- [ntainer#0-0-C-1] com.example.demo.So65501295Application   
    : received data KEY ='com.example.demo.So65501295Application$ControlChannel@44a72886'
2021-01-04 11:42:25.026  INFO 23905 --- [ntainer#0-0-C-1] com.example.demo.So65501295Application   
    : received data PAYLOAD = 'com.example.demo.So65501295Application$SchedulerEntry@74461c59'
 类似资料:
  • 下面的代码再现了这个问题: 上面的代码不做其他注册“自定义”序列化程序的事情(只是委托回原始序列化程序),但它生成的JSON没有null属性: {“第一个”:“鲍勃”,“最后一个”:“巴克”} 我读过许多看似相关的SO文章,但没有一篇能让我找到解决方案。我尝试在序列化时显式地将映射器设置为,但没有成功。 我唯一的线索是JavaDoc for JsonSerializer中的一条注释: 注意:永远不

  • 以下代码导致此异常: 所以问题是:如何在GSON序列化和反序列化的泛型HashMap中获得正确的实例?

  • 我正在试验Stanford CoreNLP库,我想序列化主要的StanfordCoreNLP管道对象,尽管它抛出了一个java.io.NotSerializableException。 完整故事:每当我运行我的实现时,将管道注释器和分类器加载到内存中大约需要15秒。最终进程的内存约为600MB(很容易小到可以存储在我的机箱中)。我想在第一次创建管道后保存它,这样我就可以在以后将其读入内存。 然而,

  • 问题内容: 我一直在努力开发可在Django和Flash应用程序中使用的RESTful服务。 开发服务接口非常简单,但是我遇到了序列化具有外键和多对多关系的对象的问题。 我有一个像这样的模型: 然后,我将使用对此模型进行查询,以确保遵循外键关系: 获得对象后,我将其序列化,并将其传递回我的视图: 这就是我得到的,请注意,外键(object_type和个人)只是其相关对象的ID。 很好,但是我希望使

  • 问题内容: 我有一堂课 并希望将其序列化为 如何实现此目标(以及如何反序列化为对象? 我正在尝试使用 但显然最终会引号过多。 也不起作用,因为它不能创建足够的报价。 问题答案: 我发现了一种使用JsonSerializer的方法: 本类需要了解这一点: 反序列化非常相似: 这也需要在Foo类上进行注释:

  • 下面是我的项目的实体类 这是会话表 这里是用户实体 我正在使用JPARepository在我的mysql数据库中保存训练和会话对象 但每当我保存训练对象或会话对象时 我得到了错误 我谷歌了它,发现我需要序列化和反序列化手动…但我不知道怎么做…请帮帮我..