当前位置: 首页 > 知识库问答 >
问题:

编写自定义Kafka序列化程序

丌官丰羽
2023-03-14

我在一个Kafka消息中使用了我自己的类,它有一堆字符串数据类型。

我想我需要编写自己的序列化器并将其提供给生产者属性?

共有1个答案

刘俊语
2023-03-14

编辑

在较新的Kafka客户端中,实现serializer而不是encoder

编写自定义序列化程序所需的内容有:

    null

如果遵循Kafka的Producer示例,您将通过Properties对象构造ProducerConfig。在构建属性文件时,请确保包括:

props.put("serializer.class", "path.to.your.CustomSerializer");

带有要让Kafka在将消息追加到日志之前用于序列化消息的类的路径。

编写Kafka可以正确解释的自定义序列化程序需要实现Kafka提供的encoder[T]scala类。在java中实现traits很奇怪,但在我的项目中序列化JSON的方法如下:

public class JsonEncoder implements Encoder<Object> {
    private static final Logger logger = Logger.getLogger(JsonEncoder.class);
    // instantiating ObjectMapper is expensive. In real life, prefer injecting the value.
    private static final ObjectMapper objectMapper = new ObjectMapper();

    public JsonEncoder(VerifiableProperties verifiableProperties) {
        /* This constructor must be present for successful compile. */
    }

    @Override
    public byte[] toBytes(Object object) {
        try {
            return objectMapper.writeValueAsString(object).getBytes();
        } catch (JsonProcessingException e) {
            logger.error(String.format("Json processing failed for object: %s", object.getClass().getName()), e);
        }
        return "".getBytes();
    }
}

您的问题听起来像是在为添加到日志中的所有消息使用一个对象(我们称之为customMessage)。如果是这种情况,那么序列化程序可能更像这样:

package com.project.serializer;
    
public class CustomMessageEncoder implements Encoder<CustomMessage> {
    public CustomMessageEncoder(VerifiableProperties verifiableProperties) {
        /* This constructor must be present for successful compile. */
    }

    @Override
    public byte[] toBytes(CustomMessage customMessage) {
        return customMessage.toBytes();
    }
}

这将使您的属性配置如下所示:

props.put("serializer.class", "path.to.your.CustomSerializer");
 类似资料:
  • 问题内容: 序列化器很少,例如 我们如何创建自己的自定义序列化程序? 问题答案: 在这里,您有一个示例,将自己的序列化器/解串器用于Kafka消息值。对于Kafka消息密钥是同一回事。 我们希望将MyMessage的序列化版本作为Kafka值发送,并再次将其反序列化为使用方的MyMessage对象。 在生产者端序列化MyMessage。 您应该创建一个实现org.apache.kafka.comm

  • 如何创建自己的自定义序列化程序?

  • easyopen序列化使用fastjson处理json,xstream处理xml。现在我们来自定义实现一个json处理: 新建一个类JsonFormatter,实现ResultSerializer接口 public class JsonFormatter implements ResultSerializer { @Override public String serialize(

  • 我正在使用Spring Kafka集成,我有自己的值通用序列化器/反序列化器,如下所示 序列化程序: 反序列化程序: 序列化程序工作得很好,但是当涉及到在消费消息时反序列化值时,我得到了一个而不是所需的对象,请告诉我我错在哪里,提前感谢。

  • 我正在构建一个简单的项目与Spring boot和sping-kafka,我不能配置它,使其工作,它是一个简单的应用程序,生成笔记(作者,内容,createddatetime,lastmodefieddatetime)和发送基于笔记的事件,当他们被创建。 我已经玩了两天了,但我想我还没学会。 这是我的配置,我很确定它有很多锅炉板,但我已经用了几个例子来使我的工作。 我有2个生产者和消费者工厂,因为

  • 我正在将Spark Scala应用程序Kafka API升级到0.10版。我曾经创建自定义方法来反序列化字节字符串格式的消息。 我已经意识到有一种方法可以将StringDeserializer或ByteArrayDeserializer作为参数传递给键或值。 但是,我找不到有关如何创建自定义Avro模式反序列化器的任何信息,以便我的kafkaStream在创建DirectStream和使用Kafk