编辑
在较新的Kafka客户端中,实现serializer
而不是encoder
。
编写自定义序列化程序所需的内容有:
如果遵循Kafka的Producer示例,您将通过Properties
对象构造ProducerConfig
。在构建属性文件时,请确保包括:
props.put("serializer.class", "path.to.your.CustomSerializer");
带有要让Kafka在将消息追加到日志之前用于序列化消息的类的路径。
编写Kafka可以正确解释的自定义序列化程序需要实现Kafka提供的encoder[T]
scala类。在java中实现traits很奇怪,但在我的项目中序列化JSON的方法如下:
public class JsonEncoder implements Encoder<Object> {
private static final Logger logger = Logger.getLogger(JsonEncoder.class);
// instantiating ObjectMapper is expensive. In real life, prefer injecting the value.
private static final ObjectMapper objectMapper = new ObjectMapper();
public JsonEncoder(VerifiableProperties verifiableProperties) {
/* This constructor must be present for successful compile. */
}
@Override
public byte[] toBytes(Object object) {
try {
return objectMapper.writeValueAsString(object).getBytes();
} catch (JsonProcessingException e) {
logger.error(String.format("Json processing failed for object: %s", object.getClass().getName()), e);
}
return "".getBytes();
}
}
您的问题听起来像是在为添加到日志中的所有消息使用一个对象(我们称之为customMessage
)。如果是这种情况,那么序列化程序可能更像这样:
package com.project.serializer;
public class CustomMessageEncoder implements Encoder<CustomMessage> {
public CustomMessageEncoder(VerifiableProperties verifiableProperties) {
/* This constructor must be present for successful compile. */
}
@Override
public byte[] toBytes(CustomMessage customMessage) {
return customMessage.toBytes();
}
}
这将使您的属性配置如下所示:
props.put("serializer.class", "path.to.your.CustomSerializer");
问题内容: 序列化器很少,例如 我们如何创建自己的自定义序列化程序? 问题答案: 在这里,您有一个示例,将自己的序列化器/解串器用于Kafka消息值。对于Kafka消息密钥是同一回事。 我们希望将MyMessage的序列化版本作为Kafka值发送,并再次将其反序列化为使用方的MyMessage对象。 在生产者端序列化MyMessage。 您应该创建一个实现org.apache.kafka.comm
如何创建自己的自定义序列化程序?
easyopen序列化使用fastjson处理json,xstream处理xml。现在我们来自定义实现一个json处理: 新建一个类JsonFormatter,实现ResultSerializer接口 public class JsonFormatter implements ResultSerializer { @Override public String serialize(
我正在使用Spring Kafka集成,我有自己的值通用序列化器/反序列化器,如下所示 序列化程序: 反序列化程序: 序列化程序工作得很好,但是当涉及到在消费消息时反序列化值时,我得到了一个而不是所需的对象,请告诉我我错在哪里,提前感谢。
我正在构建一个简单的项目与Spring boot和sping-kafka,我不能配置它,使其工作,它是一个简单的应用程序,生成笔记(作者,内容,createddatetime,lastmodefieddatetime)和发送基于笔记的事件,当他们被创建。 我已经玩了两天了,但我想我还没学会。 这是我的配置,我很确定它有很多锅炉板,但我已经用了几个例子来使我的工作。 我有2个生产者和消费者工厂,因为
我正在将Spark Scala应用程序Kafka API升级到0.10版。我曾经创建自定义方法来反序列化字节字符串格式的消息。 我已经意识到有一种方法可以将StringDeserializer或ByteArrayDeserializer作为参数传递给键或值。 但是,我找不到有关如何创建自定义Avro模式反序列化器的任何信息,以便我的kafkaStream在创建DirectStream和使用Kafk