当前位置: 首页 > 知识库问答 >
问题:

JAVAlang.ClassCastException:class model-在主题中将JSON对象作为消息发送时,springboot kafka集成中出错

屈宏爽
2023-03-14

我是Kafka新手,mymodel类用户[请求处理失败;嵌套异常为org.apache.Kafka.common.errors.SerializationException:无法将class model.User的值转换为value.serializer中指定的类org.apache.Kafka.common.serialization.StringSerializer],根本原因是java。ClassCastException:类模型。无法将用户强制转换为java类。org上的lang.String(model.User位于加载器“app”的未命名模块中;java.lang.String位于加载器“bootstrap”的java.base模块中)。阿帕奇。Kafka。常见的序列化。字符串序列化程序。序列化(StringSerializer.java:28)~[kafka-clients-2.7.1.jar:na]at*

我怀疑这是由于Kafka配置中错误导入了StringSerializer和JSONSerializer。下面是我的代码

1-Kafka形象

package config;

import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.connect.json.JsonSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

import com.fasterxml.jackson.databind.ser.std.StringSerializer;

import model.User;

@Configuration
public class KafkaConfiguration {
    
    @Bean
    public ProducerFactory<String,User> producerFactory()
    {
        Map<String,Object> config=new HashMap<>();
        
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,JsonSerializer.class);
        return new DefaultKafkaProducerFactory<>(config);
    }
    
    @Bean
    public KafkaTemplate<String,User> kafkaTemplate()
    {
        return new KafkaTemplate<>(producerFactory());
    }

}

2用户资源类

package com.example.demo;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import model.User;

@RestController
@RequestMapping("kafka")
public class UserResource {

    @Autowired
    KafkaTemplate<String,User> kafkatemplate;
    public static final String TOPIC="Kafka_Example";

    
    @GetMapping("/publish/{name}")
    public String postMessage(@PathVariable("name") final String name)
    {
        
    kafkatemplate.send(TOPIC,new User(name,"Technology",12000L));
    
    return "Published successfully";
    }
}

3-用户类

package model;

public class User {

    private String name;
    private String dept;
    private long salary;
    
    
    public User(String name, String dept, long salary) {
        super();
        this.name = name;
        this.dept = dept;
        this.salary = salary;
    }
    

    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
    public String getDept() {
        return dept;
    }
    public void setDept(String dept) {
        this.dept = dept;
    }
    public long getSalary() {
        return salary;
    }
    public void setSalary(long salary) {
        this.salary = salary;
    }
    
    
    
}

谁能告诉我哪里出了问题?是否与进口有关(如果是,正确的是什么)?

谢啦

共有2个答案

宇文学博
2023-03-14

你的代码正确,但你导入错误StringSerializer使用下面导入

org.apache.kafka.common.serialization.StringSerializer
org.springframework.kafka.support.serializer.JsonSerializer
常源
2023-03-14

解决方案

  1. 你需要用这个类导入字符串序列化器和json序列化器
 org.apache.kafka.common.serialization.StringSerializer
 org.springframework.kafka.support.serializer.JsonSerializer

我跟着这两件事,问题已经解决了

 类似资料:
  • 问题内容: 我有两个servlet:第一个servlet与客户端相似,并创建了一个以调用第二个servlet。 我想发送一个特殊的错误,格式类似于JSON对象,因此我以这种方式调用sendError方法: 但是在第一个servlet中,当我读取方法错误时,我只是得到标准的HTTP消息,而不是作为字符串的json对象。 如何获取json字符串? 问题答案: 从javadoc: 服务器默认创建响应,使

  • 我是Kafka的新手,当我试图发送信息到我得到的主题下面的错误。有人能帮我一下吗? [2018-09-23 13:37:56,613]警告[Producer Clientid=Console-Producer]无法建立到节点-1的连接。代理可能不可用。(org.apache.kafka.clients.NetworkClient)

  • 目标 我想发送一个消息到一个主题,我将处理稍后与客户机应用程序。为此,我使用Spring Boot和Spring Integration Java DSL及其JMS模块。作为消息代理,我使用本机ActiveMQ Artemis。 这是我的设置 作为ActiveMQ Artemis服务器,我使用具有默认配置的Vromero/Artemis(2.6.0)docker映像。 问题所在 在producer

  • 我想产生一个Kafka主题的信息。该消息应该具有以下模式: 我知道这是一个json模式,那么如何将json转换成字符串呢?

  • 问题内容: 我使用以下代码发送邮件。文本消息发送正常,但带有附件的邮件不起作用,它给出了异常。 javax.mail.MessagingException:发送消息时发生IOException;嵌套的异常是:javax.activation.UnsupportedDataTypeException:MIME类型为multipart / mixed的无对象DCH;boundary =“ ---- =

  • 我有一个问题与产生的消息Kafka的主题。 我使用来自外部供应商的Kafka管理服务,所以我问他经纪人的状况,他说一切都好。顺便说一句,它发生在三个不同的Kafka实例上。Kafka客户端版本也无关紧要-0.11.0.0和2.0.1都有。