当前位置: 首页 > 知识库问答 >
问题:

Apache Kafka:…StringDeserializer不是…Deserializer的实例

田志
2023-03-14

在我的简单应用程序中,我试图实例化一个KafkaConsumer,我的代码几乎是javadoc代码的副本(“自动偏移提交”):

@Slf4j
public class MyKafkaConsumer {

    public MyKafkaConsumer() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "test");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe( Arrays.asList("mytopic"));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records)
                log.info( record.offset() + record.key() + record.value() );
                //System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
        }
    }
}

如果我尝试实例化这个,我得到:

org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:781)
        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:635)
        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:617)
at ...MyKafkaConsumer.<init>(SikomKafkaConsumer.java:23)
    ...
    Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.serialization.StringDeserializer is not an instance of org.apache.kafka.common.serialization.Deserializer
        at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:248)
        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:680)
        ... 48 more

这个怎么修

共有3个答案

傅长恨
2023-03-14

您的Custom类需要实现org.apache.kafka.common.serialization.Deserializer。

喜欢

import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.codehaus.jackson.map.ObjectMapper;

import java.io.Serializable;
import java.util.Map;

//Developed by Arun Singh
public class Employee implements Serializable, Serializer, **Deserializer** {

@Override
    public Object deserialize(String s, byte[] bytes) {
        ObjectMapper mapper = new ObjectMapper();
        Employee employee = null;
        try {
            //employee = mapper.readValue(bytes, Employee.class);
            employee = mapper.readValue(bytes.toString(), Employee.class);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return employee;
    }

    @Override
    public Object deserialize(String topic, Headers headers, byte[] data) {
        ObjectMapper mapper = new ObjectMapper();
        Employee employee = null;
        try {
            //employee = mapper.readValue(bytes, Employee.class);
            employee = mapper.readValue(data.toString(), Employee.class);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return employee;
    }

    public void close() {

    }
}
吕鸿轩
2023-03-14

这可能是Kafka类加载的问题
将类加载器设置为null可能会有所帮助。

...
Thread currentThread = Thread.currentThread();    
ClassLoader savedClassLoader = currentThread.getContextClassLoader();

currentThread.setContextClassLoader(null);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

currentThread.setContextClassLoader(savedClassLoader);
...

有完整的解释:
https://stackoverflow.com/a/50981469/1673775

申高峯
2023-03-14

不确定这是不是最终修复了您的错误,但请注意,在对1.1.x kafka客户端jar使用spring kafka测试(版本2.1.x,从版本2.1.5开始)时,您需要重写某些可传递依赖项,如下所示:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>${spring.kafka.version}</version>
</dependency>

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka-test</artifactId>
    <version>${spring.kafka.version}</version>
    <scope>test</scope>
</dependency>

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>1.1.1</version>
</dependency>

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>1.1.1</version>
    <classifier>test</classifier>
</dependency>

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.11</artifactId>
    <version>1.1.1</version>
    <scope>test</scope>
</dependency>

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.11</artifactId>
    <version>1.1.1</version>
    <classifier>test</classifier>
    <scope>test</scope>
</dependency>

所以你的可传递依赖肯定有问题

 类似资料:
  • 我已经在kafka上工作了相当长的六个月,我对用户延迟和存储到主题分区中的数据有一些疑问。 问题1:最初,当我开始阅读Kafka并了解如何使用Kafka的功能时,我被教导说,一个只有一部分和一个复制因子的主题会创造奇迹。经过相当长的六个月的工作,将我的项目迁移到live之后,使用我的主题消息的消费者开始给我一个延迟。我阅读了许多关于消费者延迟的堆栈溢出答案,得出结论,如果我增加某个主题的分区和复制

  • 为了实现Kafka消费者对消息的一次处理,我一次提交一条消息,如下所示 上面的代码将消息的处理异步委托给下面的另一个类。 但是,这仍然不能保证只发送一次,因为如果处理失败,它可能仍会提交其他消息,并且以前的消息将永远不会被处理和提交,我在这里的选项是什么?

  • 我尝试在GitHub(https://github.com/onurtokat/kafka-clickstream-enrich)上模拟Gwen(Chen)Shapira的kafka-clickstream-rich kafka-stream项目。当我使用反序列化器使用消费者类消费一个主题时,我遇到了一个错误。自定义的Serde类有序列化器和反序列化器。但是,我试图理解为什么自定义serde用于反

  • 我们有一个应用程序,它使用来自Kafka主题(3个分区)的消息,丰富数据,并将记录保存在DB(Spring JPA)中,然后将消息发布到另一个Kafka主题(在同一个代理上),所有这些都通过使用Camel 2.4.1和Spring Boot 2.1.7进行编排。释放 我们想为 kafka 消费者-生产者组合实现“exactly-once”语义。 消费者设置: 生产者设置: 豆接线: 骆驼路线: 但

  • 对于这个示例: null 是否有类似这样的语句用于此检查?或者我应该使用

  • 我已经创建了要将它们连接在一起的kstream。两个流的输出如下所示: 流1: 流2: 我想创建这两个Stream的连接流(内连接),所以我创建了以下KStream: 在这个KStream中,我只使用了一个连接,我正在更改输出消息的格式,仅此而已。 通过一个例子,我将解释我想做什么: 在窗口内发布以下消息: 流1 流2 加入流 出版的是什么 我想出版什么 总之,我只想在窗口中发布最新消息,而不是所