我是Kafka新手,mymodel类用户[请求处理失败;嵌套异常为org.apache.Kafka.common.errors.SerializationException:无法将class model.User的值转换为value.serializer中指定的类org.apache.Kafka.common.serialization.StringSerializer],根本原因是java。ClassCastException:类模型。无法将用户强制转换为java类。org上的lang.String(model.User位于加载器“app”的未命名模块中;java.lang.String位于加载器“bootstrap”的java.base模块中)。阿帕奇。Kafka。常见的序列化。字符串序列化程序。序列化(StringSerializer.java:28)~[kafka-clients-2.7.1.jar:na]at*
我怀疑这是由于Kafka配置中错误导入了StringSerializer和JSONSerializer。下面是我的代码
1-Kafka形象
package config;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.connect.json.JsonSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import com.fasterxml.jackson.databind.ser.std.StringSerializer;
import model.User;
@Configuration
public class KafkaConfiguration {
@Bean
public ProducerFactory<String,User> producerFactory()
{
Map<String,Object> config=new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(config);
}
@Bean
public KafkaTemplate<String,User> kafkaTemplate()
{
return new KafkaTemplate<>(producerFactory());
}
}
2用户资源类
package com.example.demo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import model.User;
@RestController
@RequestMapping("kafka")
public class UserResource {
@Autowired
KafkaTemplate<String,User> kafkatemplate;
public static final String TOPIC="Kafka_Example";
@GetMapping("/publish/{name}")
public String postMessage(@PathVariable("name") final String name)
{
kafkatemplate.send(TOPIC,new User(name,"Technology",12000L));
return "Published successfully";
}
}
3-用户类
package model;
public class User {
private String name;
private String dept;
private long salary;
public User(String name, String dept, long salary) {
super();
this.name = name;
this.dept = dept;
this.salary = salary;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getDept() {
return dept;
}
public void setDept(String dept) {
this.dept = dept;
}
public long getSalary() {
return salary;
}
public void setSalary(long salary) {
this.salary = salary;
}
}
谁能告诉我哪里出了问题?是否与进口有关(如果是,正确的是什么)?
谢啦
你的代码正确,但你导入错误StringSerializer使用下面导入
org.apache.kafka.common.serialization.StringSerializer
org.springframework.kafka.support.serializer.JsonSerializer
解决方案
org.apache.kafka.common.serialization.StringSerializer
org.springframework.kafka.support.serializer.JsonSerializer
我跟着这两件事,问题已经解决了
问题内容: 我有两个servlet:第一个servlet与客户端相似,并创建了一个以调用第二个servlet。 我想发送一个特殊的错误,格式类似于JSON对象,因此我以这种方式调用sendError方法: 但是在第一个servlet中,当我读取方法错误时,我只是得到标准的HTTP消息,而不是作为字符串的json对象。 如何获取json字符串? 问题答案: 从javadoc: 服务器默认创建响应,使
我是Kafka的新手,当我试图发送信息到我得到的主题下面的错误。有人能帮我一下吗? [2018-09-23 13:37:56,613]警告[Producer Clientid=Console-Producer]无法建立到节点-1的连接。代理可能不可用。(org.apache.kafka.clients.NetworkClient)
目标 我想发送一个消息到一个主题,我将处理稍后与客户机应用程序。为此,我使用Spring Boot和Spring Integration Java DSL及其JMS模块。作为消息代理,我使用本机ActiveMQ Artemis。 这是我的设置 作为ActiveMQ Artemis服务器,我使用具有默认配置的Vromero/Artemis(2.6.0)docker映像。 问题所在 在producer
我想产生一个Kafka主题的信息。该消息应该具有以下模式: 我知道这是一个json模式,那么如何将json转换成字符串呢?
问题内容: 我使用以下代码发送邮件。文本消息发送正常,但带有附件的邮件不起作用,它给出了异常。 javax.mail.MessagingException:发送消息时发生IOException;嵌套的异常是:javax.activation.UnsupportedDataTypeException:MIME类型为multipart / mixed的无对象DCH;boundary =“ ---- =
我有一个问题与产生的消息Kafka的主题。 我使用来自外部供应商的Kafka管理服务,所以我问他经纪人的状况,他说一切都好。顺便说一句,它发生在三个不同的Kafka实例上。Kafka客户端版本也无关紧要-0.11.0.0和2.0.1都有。