当前位置: 首页 > 面试题库 >

如何在kafka中创建自定义序列化程序?

燕智
2023-03-14
问题内容

序列化器很少,例如

org.apache.kafka.common.serialization.StringSerializer
org.apache.kafka.common.serialization.StringSerializer

我们如何创建自己的自定义序列化程序?


问题答案:

在这里,您有一个示例,将自己的序列化器/解串器用于Kafka消息值。对于Kafka消息密钥是同一回事。

我们希望将MyMessage的序列化版本作为Kafka值发送,并再次将其反序列化为使用方的MyMessage对象。

在生产者端序列化MyMessage。

您应该创建一个实现org.apache.kafka.common.serialization.Serializer的序列化器类。

serialize() 方法可以完成工作,接收对象并以字节数组形式返回序列化的版本。

public class MyValueSerializer implements Serializer<MyMessage>
{
    private boolean isKey;

    @Override
    public void configure(Map<String, ?> configs, boolean isKey)
    {
        this.isKey = isKey;
    }

    @Override
    public byte[] serialize(String topic, MyMessage message)
    {
        if (message == null) {
            return null;
        }

        try {

            (serialize your MyMessage html" target="_blank">object into bytes)

            return bytes;

        } catch (IOException | RuntimeException e) {
            throw new SerializationException("Error serializing value", e);
        }
    }

    @Override
    public void close()
    {

    }
}

final IntegerSerializer keySerializer = new IntegerSerializer();
final MyValueSerializer myValueSerializer = new MyValueSerializer();
final KafkaProducer<Integer, MyMessage> producer = new KafkaProducer<>(props, keySerializer, myValueSerializer);

int messageNo = 1;
int kafkaKey = messageNo;
MyMessage kafkaValue = new MyMessage();
ProducerRecord producerRecord = new ProducerRecord<>(topic, kafkaKey, kafkaValue);
producer.send(producerRecord, new DemoCallBack(logTag, startTime, messageNo, strValue));

在用户端反序列化MyMessage。

您应该创建一个实现org.apache.kafka.common.serialization.Deserializer的反序列化器类。

deserialize() 方法可以完成工作,以字节数组形式接收序列化的值并返回您的对象。

public class MyValueDeserializer implements Deserializer<MyMessage>
{
    private boolean isKey;

    @Override
    public void configure(Map<String, ?> configs, boolean isKey)
    {
        this.isKey = isKey;
    }

    @Override
    public MyMessage deserialize(String s, byte[] value)
    {
        if (value == null) {
            return null;
        }

        try {

            (deserialize value into your MyMessage object)

            MyMessage message = new MyMessage();
            return message;

        } catch (IOException | RuntimeException e) {
            throw new SerializationException("Error deserializing value", e);
        }
    }

    @Override
    public void close()
    {

    }
}

然后像这样使用它:

final IntegerDeserializer keyDeserializer = new IntegerDeserializer();
final MyValueDeserializer myValueDeserializer = new MyValueDeserializer();
final KafkaConsumer<Integer, MyMessage> consumer = new KafkaConsumer<>(props, keyDeserializer, myValueDeserializer);

ConsumerRecords<Integer, MyMessage> records = consumer.poll(1000);
for (ConsumerRecord<Integer, MyMessage> record : records) {

    int kafkaKey = record.key();
    MyMessage kafkaValue = record.value();

    ...
}


 类似资料:
  • 如何创建自己的自定义序列化程序?

  • 我在一个Kafka消息中使用了我自己的类,它有一堆字符串数据类型。 我想我需要编写自己的序列化器并将其提供给生产者属性?

  • 我希望在Jackson中为Enum类的集合创建一个自定义反序列化器。因为自定义反序列化器的行为对于所有枚举都是相同的。我想为我的所有枚举类创建公共反序列化程序。 我尝试使一般自定义反序列化如下: 问题是我想在反序列化器中调用枚举静态方法,但无法这样做,因为我没有任何可用的类/枚举上下文信息。 你能帮我知道如何实现它吗?

  • 我想创建一个自定义序列化程序,它只做一点点工作,然后将其余部分留给默认序列化。 例如: 通过为聚合对象创建其他自定义序列化器的想法,这些对象根据“特殊”属性值表现不同。然而,上面的代码不起作用,因为它毫不奇怪地进入了无限递归。 一旦我设置了属性,有没有办法告诉jackson使用默认序列化?我真的不想像许多自定义序列化程序那样枚举所有属性,因为该类相当复杂,并且我不想每次更改该类时都必须对序列化程序

  • 问题内容: 道歉,如果以前已经问过这个问题,我已经搜索了很多,并且许多答案都来自早期Swift的Beta版本,当时情况有所不同。我似乎找不到确切的答案。 我想创建一个子类,并有一个自定义的初始化程序,使我可以轻松地在代码中进行设置。我在Swift中无法执行此操作。 我想要一个可以用来传递特定参数的函数,然后将其与视图控制器一起使用。在我看来,它看起来像。如果我添加了该功能,则要求我添加该功能。 我

  • 问题内容: 我有两个要使用Jackson序列化为JSON的Java类: 我想将Item序列化为此JSON: 用户序列化为仅包含。我还将能够将所有用户对象序列化为JSON,例如: 所以我想我需要为此编写一个自定义的序列化程序并尝试过: 我使用来自Jackson How-to:Custom Serializers的 代码对JSON进行了序列化: 但是我得到这个错误: 如何在Jackson上使用自定义序