我有我的自定义Java对象,希望利用JVM的内置序列化将其发送到Kafka主题,但是序列化失败并出现以下错误
org.apache.kafka.common.errors.SerializationException:无法将com.spring.kafka.Payload类的值转换为value.serializer中指定的org.apache.kafka.common.serialization.ByteArraySerializer类。
Payload.java
public class Payload implements Serializable {
private static final long serialVersionUID = 123L;
private String name="vinod";
private int anInt = 5;
private Double aDouble = new Double("5.0");
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAnInt() {
return anInt;
}
public void setAnInt(int anInt) {
this.anInt = anInt;
}
public Double getaDouble() {
return aDouble;
}
public void setaDouble(Double aDouble) {
this.aDouble = aDouble;
}
}
在创建生产者的过程中,我设置了以下属性
<entry key="key.serializer"
value="org.apache.kafka.common.serialization.ByteArraySerializer" />
<entry key="value.serializer"
value="org.apache.kafka.common.serialization.ByteArraySerializer" />
我的发送调用如下
kafkaProducer.send(new ProducerRecord<String, Payload>("test", new Payload()));
通过生产者向kafka主题发送自定义Java对象的正确方法是什么?
我们有以下两个选项
1)如果我们打算将自定义Java对象发送给生产者,则需要创建一个实现
org.apache.kafka.common.serialization.Serializer
的序列化器,并在创建生产者期间传递该Serializer类
下面的代码参考
public class PayloadSerializer implements org.apache.kafka.common.serialization.Serializer {
public void configure(Map map, boolean b) {
}
public byte[] serialize(String s, Object o) {
try {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(baos);
oos.writeObject(o);
oos.close();
byte[] b = baos.toByteArray();
return b;
} catch (IOException e) {
return new byte[0];
}
}
public void close() {
}
}
并据此设置值序列化器
<entry key="value.serializer"
value="com.spring.kafka.PayloadSerializer" />
2)无需创建自定义序列化器类。使用现有的ByteArraySerializer,但在发送过程中遵循该过程
Java对象->字符串(最好是JSON表示形式,而不是toString)-> byteArray
我有我的自定义Java对象,并希望在构建序列化中利用JVM将其发送到Kafka主题,但序列化失败,出现以下错误 org.apache.kafka.Common.Errors.SerializationException:无法将类com.spring.kafka.payload的值转换为value.Serializer中指定的类org.apache.kafka.Common.Serializatio
我在sping-boot应用程序中使用sping-kafka发送数据主题。我需要从oracle表中获取数据并发送它。 我从oracle表中获取列表。如何将它们发送到主题? 即。 > 有没有办法将它们作为列表发送?如果是,如何发送?如果是,那么如何在消费者端反序列化它? 是否可以使用spring book和spring kafka以流式方式发送数据?如果是,请提供更多信息或样本/片段plz。。。 如
问题内容: 我是否可以使用标准方法将自己的自定义对象添加到Map,然后将其正确编组到MapMessage中?当前,我收到无效对象类型消息。我注意到WebSphere有解决方案,但是我正在寻找不受特定AS约束的东西,如果没有这种方法,也许JBoss支持的东西会起作用。 如何在WebSphere中进行操作:http : //publib.boulder.ibm.com/infocenter/dmndh
我对Kafka很陌生。我正在尝试发送一个消息到Kafka主题,其中包含头和有效载荷。 以下是错误: @PostMapping(value=“/publish”)public void sendMessageToKafkaTopic(@RequestBody CabLocationPayload CabLocationPayload){ Header和Payload具有JSON的映射字段。 在Pro
问题内容: 我有一个新闻和一个消息。所述含有选择加入到阵列时teamObjects的的tableView。我想将此数组添加到其中,以便可以从其中包含需要teamObjects的url请求的访问它们。但是我不断得到: “试图为关键团队插入非财产列表对象(“”) 如果有比将其存储在更好的方法,我愿意接受其他建议 方法 我的对象 问题答案: 实际上,您将需要将自定义对象归档到其中,然后将其保存到用户默认
我试图使用pyspark将每日批次的数据发送到Kafka主题,但我当前收到以下错误: Traceback(最近的最后一次调用): File", line 5, in File"/usr/local/rms/lib/hdp26_c5000/park2/python/pyspark/sql/readwriter.py", line 548, in保存自己。_jwrite.save()File"/usr