当前位置: 首页 > 知识库问答 >
问题:

发送到Kafka主题时反序列化对象时出错

施飞驰
2023-03-14

我对Kafka很陌生。我正在尝试发送一个消息到Kafka主题,其中包含头和有效载荷。

以下是错误:

"org.apache.kafka.common.errors.SerializationException: Can't convert value of class com.cabservice.request.CabLocationPayload to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer\nCaused by: java.lang.ClassCastException: class com.cabservice.request.CabLocationPayload cannot be cast to class java.lang.String 
public class CabLocationPayload {

private Header header;

private Payload payload;

@PostMapping(value=“/publish”)public void sendMessageToKafkaTopic(@RequestBody CabLocationPayload CabLocationPayload){

Header和Payload具有JSON的映射字段。

在Producer中更改VALUE_SERIALIZER_CLASS_CONFIG后,我可以看到数据。但ClassCastException仍然失败。

    configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

    return new DefaultKafkaProducerFactory<>(configProps);
}

@Bean
public KafkaTemplate<String, String> kafkaTemplateString() {
    return new KafkaTemplate<>(producerFactoryString());
}

@Bean
public ConsumerFactory<String, String> consumerFactory() {
    Map<String, Object> configProps = new HashMap<>();
    configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    configProps.put(ConsumerConfig.GROUP_ID_CONFIG, "group_id");
    configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);

    return new DefaultKafkaConsumerFactory<>(configProps);
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());

    return factory;
}

当前错误为{2 09:41:20.108信息22561--[ad producer-1]org.apache.kafka.clients.metadata:[ProducerClientID=producer-1]群集ID:lWghv-b_RG-_hO-qOp_cjA 2021-04-22 09:41:20.123错误22561--[nio-9080-exec-2]o.a.cc.[.[.[/].[dispatcherServlet]:路径为[]的上下文中servlet[dispatcherServlet]的servlet.Service()引发异常[请求处理失败;嵌套异常为

java.lang.ClassCastException:类com.cabservice.request.cabLocationPayload不能强制转换为类java.lang.String(com.cabservice.request.cabLocationPayload位于加载程序org.springframework.boot.devtools.restart.ClassLoader.restartClassLoader@1144043d的未命名模块中;java.lang.String位于加载程序“bootstrap”的模块java.base中),位于

任何帮助都是非常感谢的。

共有1个答案

鲁博赡
2023-03-14

Kafka配置“value.Serializer”配置应该是序列化程序子类,而不是您的对象类型

例如:

键:VALUE_SERIALIZER_CLASS_CONFIG,值:jsonserializer.class(来源:org.springframework.kafka.support.serializer)

    @EnableKafka
    @Configuration
    public class KafkaProducerConfiguration {
    
        @Bean
        KafkaTemplate<String, Object> kafkaTemplate() {
            return new KafkaTemplate<>(producerFactory());
        }
    
        @Bean
        public ProducerFactory<String, Object> producerFactory() {
            return new DefaultKafkaProducerFactory<>(getConfig());
        }
    
        private Map<String, Object> getConfig() {
            Map<String, Object> config = new HashMap<>();
    
            config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "brokers");
            config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
            return config;
        }
    
    }

您必须将您的类替换为您想要消耗的类名。(对于本例:CabLocationPayload)

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;
import org.springframework.kafka.support.serializer.JsonDeserializer;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class KafkaConsumerConfiguration {


    private Map<String, Object> consumerConfigs() {
        final Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "your brokers");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "consumeer-group-id");
        return props;
    }

    @Bean
    public ConsumerFactory<String, YourClass> kafkaListenerConsumerFactory() {
        final ErrorHandlingDeserializer<YourClass> errorHandlingDeserializer = new ErrorHandlingDeserializer<>(new JsonDeserializer<>(YourClass.class, false));
        return new DefaultKafkaConsumerFactory<>(this.consumerConfigs(), new StringDeserializer(), errorHandlingDeserializer);
    }


    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, YourClass> kafkaListenerContainerFactory() {
        final ConcurrentKafkaListenerContainerFactory<String, YourClass> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(this.kafkaListenerConsumerFactory());
        return factory;
    }

}
 类似资料:
  • 我是Kafka的新手,当我试图发送信息到我得到的主题下面的错误。有人能帮我一下吗? [2018-09-23 13:37:56,613]警告[Producer Clientid=Console-Producer]无法建立到节点-1的连接。代理可能不可用。(org.apache.kafka.clients.NetworkClient)

  • 问题内容: 我需要转换从REST API获得的JSON数据,并将其转换为CSV以便进行分析。问题在于JSON数据不一定遵循相同的内容,因此我无法定义映射类型。这已经成为一项挑战,占用了我太多时间。我已经创建了一些代码,但是由于它在此行上引发了异常,因此它当然不起作用 错误是: 附加信息:无法将当前JSON对象(例如{“ name”:“ value”})反序列化为类型’System.Collecti

  • 我使用Kafka Consumer API来构建Consumer。为了构建反序列化器,我实现了Deserializer类并提供了必要的实现。我收到此错误“Exception Raisedorg.apache.kafka.Common.Errors.SerializationException:错误反序列化分区staging.DataFeeds.PartnerHotel-0的键/值,偏移量为1920

  • 我有一个非常大的JSON文件太大,无法使用ObjectMapper.read值()到JsonNode的问题。我想在Android上使用杰克逊库解析大型JSON时使用内存不足错误的解决方案,除了JSON文件是一个具有字段名称的单个对象,因此我无法创建一个模型POJO来反序列化。 对象内部的每个属性都有相同的格式,我可以忽略这些内部对象的许多属性(我已经有了一个POJO类来建模)。如果JSON文件是一

  • 错误: java.lang.ClassNotFoundException:testprocedure.tp$3在java.net.URLClassLoader$1上运行(未知源)在java.net.URLClassLoader上运行(未知源)在java.security.accessController.doprivileged(本机方法)在java.net.URLClassLoader.find

  • 问题内容: 我有我的自定义Java对象,希望利用JVM的内置序列化将其发送到Kafka主题,但是序列化失败并出现以下错误 org.apache.kafka.common.errors.SerializationException:无法将com.spring.kafka.Payload类的值转换为value.serializer中指定的org.apache.kafka.common.serializ