当前位置: 首页 > 知识库问答 >
问题:

从kafka流和Avro反序列化同一类时出现ClassCastException

党宇定
2023-03-14

我已经用avro-maven-plugin从Avro模式中生成了我的Avro Java类。我将我的avro类序列化为一个字节数组,并将其写入kafka主题。

然后我有一个kafka流,它试图操纵avro数据来做一些事情。在反序列化过程中,我从同一个类中获得了一个ClassCastExcetion。我了解到这个问题是由于Avro在回退时使用了不同的类加载器(类加载器的新实例)而产生的。

有一种方法可以强制Avro使用调用方的ClassLoader或类似的东西?

KafkaStream属性

this.props = new Properties();
        this.props.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
        this.props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        this.props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        this.props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass());

我使用一个字符串键和序列化的avro的字节数组,然后我需要手动反序列化avro的有效载荷。我用avro的解码器反序列化成这样:

AvroPayload stp = AvroPayload.fromByteBuffer(ByteBuffer.wrap(bytes));

或者甚至像这样:

AvroPayload stp = AvroPayload.getDecoder().decode(ByteBuffer.wrap(bytes));

在第一个版本中,通过调试,我可以看到,如果我保持在avro生成的类上下文中,字节数组将正确地反序列化到AvroPayload类中。返回该新实例可能会引发一个<code>类异常</code>问题

共有1个答案

施彬彬
2023-03-14

我发现的唯一解决方案是按照建议将avro类放入外部jar中,然后导入它。

这不是一个好的解决方案,因为它需要大量配置来保持avro模式和生成的类的耦合,但这是我发现的唯一解决方案。

我配置了一个maven项目,将avro的类jar生成到一个目录中,名称中没有工件版本,因此我可以始终导入最新版本,而无需更改pom。

如果有人会发现另一种解决方案,请发布

 类似资料:
  • 我试图构建一个流,它获得一个Avro主题,做一个简单的转换,然后以Avro格式再次将其发送回另一个主题,我有点卡在最后的序列化部分。 我创建了一个AVRO模式,我正在导入它并使用它创建特定的AVRO Serde。但是我不知道如何使用这个serde将电影对象序列化回AVRO。 这是流类: 谢谢

  • 我试图使用Confluent Kafka REST Proxy从我的一个主题中检索Avro格式的数据,但不幸的是,我得到了一个反序列化错误。我使用以下命令查询Kafka REST代理 我得到的回应是 Kafka Rest Proxy服务器上的日志如下: 数据是使用KafkaAvroSerializer生成的,模式在模式注册表中。还请注意,在CLI上使用avro console consumer可以

  • 我正在用Kafka、星火和朱皮特笔记本做概念验证,我遇到了一个奇怪的问题。我正在试着阅读从Kafka到Pyspark的Avro记录。我正在使用汇合模式注册表获取模式以反序列化avro消息。反序列化spark dataframe中的avro消息后,结果列为空,没有任何错误。列应该包含数据,因为当强制转换为字符串时,某些avro字段是可读的。 我也尝试过在Scala中的spark-shell(没有ju

  • 我从一个远程服务器接收到Python中的Kafka Avro消息(使用Confluent Kafka Python库的使用者),这些消息用json字典表示clickstream数据,其中包含用户代理、位置、url等字段。下面是消息的样子: 如何解码?我尝试了bson解码,但字符串没有被识别为UTF-8,因为我猜它是一种特定的Avro编码。我找到https://github.com/verisign

  • 我有一台装有Java 1.6的服务器。在这里,我需要使用Confluent的< code > KafkaAvroDeserializer 来反序列化avro消息。 问题是: 如果我使用Confluent-1.0(它与Java兼容 如果我使用Confluent-2.0或更高版本,它拥有一切,但它只与java兼容 在这种情况下我该怎么办? 为了比较: http://docs.confluent.io/