当前位置: 首页 > 知识库问答 >
问题:

在Kafka消费者API中实现Deserializer和Serde有什么区别?

秋光熙
2023-03-14

我尝试在GitHub(https://github.com/onurtokat/kafka-clickstream-enrich)上模拟Gwen(Chen)Shapira的kafka-clickstream-rich kafka-stream项目。当我使用反序列化器使用消费者类消费一个主题时,我遇到了一个错误。自定义的Serde类有序列化器和反序列化器。但是,我试图理解为什么自定义serde用于反序列化器,然后消费者API给出了错误,因为它不是org.apache.kafka.common.serialization.Deserializer的实例

该主题可以使用带有Serdes的KTable。Integer()序列化器和新的ProfileSerde()反序列化器,如下所示。

KTable<Integer, UserProfile> profiles = builder.table(Constants.USER_PROFILE_TOPIC,
                Consumed.with(Serdes.Integer(), new ProfileSerde()),
                Materialized.as("profile-store"));

定制塞尔德被定义为;

static public final class ProfileSerde extends WrapperSerde<UserProfile> {
        public ProfileSerde() {
            super(new JsonSerializer<UserProfile>(), new JsonDeserializer<UserProfile>(UserProfile.class));
        }
    }

和通用Serde定制喜欢下面;

package com.onurtokat.serde;

import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;

import java.util.Map;


public class WrapperSerde<T> implements Serde<T> {

    final private Serializer<T> serializer;
    final private Deserializer<T> deserializer;

    public WrapperSerde(Serializer<T> serializer, Deserializer<T> deserializer) {
        this.serializer = serializer;
        this.deserializer = deserializer;
    }

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
        serializer.configure(configs, isKey);
        deserializer.configure(configs, isKey);
    }

    @Override
    public void close() {
        serializer.close();
        deserializer.close();
    }

    @Override
    public Serializer<T> serializer() {
        return serializer;
    }

    @Override
    public Deserializer<T> deserializer() {
        return deserializer;
    }
}

我的消费就是这么简单,下面就能看出来;

package com.onurtokat.consumers;

import com.onurtokat.ClickstreamEnrichment;
import com.onurtokat.Constants;
import com.onurtokat.model.UserProfile;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.IntegerDeserializer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

public class ConsumeProfileData {

    public static void main(String[] args) {
        //prepare config
        Properties config = new Properties();
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class.getName());
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ClickstreamEnrichment.ProfileSerde.class);

        KafkaConsumer<Integer, UserProfile> consumerProfileTopic = new KafkaConsumer<>(config);
        consumerProfileTopic.subscribe(Arrays.asList(Constants.USER_PROFILE_TOPIC));
        while (true) {
            ConsumerRecords<Integer, UserProfile> records = consumerProfileTopic.poll(Duration.ofMillis(100));
            for (ConsumerRecord<Integer, UserProfile> record : records) {
                System.out.println(record.key() + " " + record.value());
            }
        }
    }
}

当我尝试与我的消费者消费主题时,错误是;

log4j:WARN No appenders could be found for logger (org.apache.kafka.clients.consumer.ConsumerConfig).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Exception in thread "main" org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:811)
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:659)
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:639)
    at com.onurtokat.consumers.ConsumeProfileData.main(ConsumeProfileData.java:25)
Caused by: org.apache.kafka.common.KafkaException: com.onurtokat.ClickstreamEnrichment$ProfileSerde is not an instance of org.apache.kafka.common.serialization.Deserializer
    at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:304)
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:712)
    ... 3 more

共有2个答案

窦凯定
2023-03-14

看来你误解了:

可以使用带有Serdes.Integer()序列化器和新ProfileSerde()反序列化器的KTable来使用该主题,如下所示。

您必须为Consumed.with()提供KeySerde和ValueSerde。

关于例外:

非常清楚——您必须设置反序列化器(不是Serde)的实现

config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, //here);
糜帅
2023-03-14

不同的是:

    < li>Serdes由Kafka的Streams API(也称为Kafka Streams)使用。Serde是同一数据类型的一对(1)序列化程序和(2)反序列化程序的包装器——参见下面两个要点。即一个< code>Serde

关于:

Caused by: org.apache.kafka.common.KafkaException: com.onurtokat.ClickstreamEnrichment$ProfileSerde is not an instance of org.apache.kafka.common.serialization.Deserializer
    at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:304)
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:712)
    ... 3 more

您的 Kafka 消费者客户端代码被赋予了 Serde,它期望有一个解串器

 类似资料:
  • “Kafka spout”和“Kafka Consumer”都从Kafka经纪人那里检索数据,到目前为止我知道的spout是用来与Storm通信的,而Consumer是用来与其他任何东西通信的。 --但是,技术上的区别是什么? -或者,如果我使用Consumer提取数据,然后使用“Storm Spout”接收数据,和如果我只是使用“Kafka Spout”,然后将其添加到我的Storm拓扑构建器的

  • Flink kafka消费者有两种类型的消费者,例如: 这两个消费者层次结构扩展了相同的类。我想知道维护编号类背后的设计决策是什么?我们什么时候应该使用其中一种? 我注意到带有数字后缀的类有更多的特性(例如ratelimiting)。 https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka

  • 本文向大家介绍消费者和消费者组有什么关系?相关面试题,主要包含被问及消费者和消费者组有什么关系?时的应答技巧和注意事项,需要的朋友参考一下 每个消费者从属于消费组。具体关系如下:

  • 本文向大家介绍什么是kafka消费者组?相关面试题,主要包含被问及什么是kafka消费者组?时的应答技巧和注意事项,需要的朋友参考一下 答:消费者组的概念是Apache Kafka独有的。基本上,每个Kafka消费群体都由一个或多个共同消费一组订阅主题的消费者组成。

  • 假设我有一个名为“MyTopic”的主题,它有3个分区P0、P1和P2。这些分区中的每一个都有一个leader,并且本主题的数据(消息)分布在这些分区中。 1.Producer将始终根据代理上的负载以循环方式写到分区的领导者。对吗? 2.制作人如何认识隔断的首领?

  • 我最近开始学习Kafka,最后就问了这些问题。 > 消费者和流的区别是什么?对我来说,如果任何工具/应用程序消费来自Kafka的消息,那么它就是Kafka世界中的消费者。 流与Kafka有何不同?为什么需要它,因为我们可以使用消费者API编写自己的消费者应用程序,并根据需要处理它们,或者将它们从消费者应用程序发送到Spark? 我做了谷歌对此,但没有得到任何好的答案。抱歉,如果这个问题太琐碎了。