当前位置: 首页 > 知识库问答 >
问题:

Avro GenericRecord to nested POJO

薛泰
2023-03-14

有没有办法将 GenericRecord(我刚刚从 Kafka 消息中得到的)反序列化为嵌套 POJO?我实际上正在将其反序列化为 Scala 的案例类,但我意识到这更难。我通过互联网搜索,似乎每个人都在手动进行。您知道任何能够做到这一点的库吗?

共有2个答案

斜瑞
2023-03-14

我想出了这个:

  def valueAvroDeserializer[A](schemaRegistryUrl: String, targetType: Class[A]): Deserializer[A] = {
val readerSchema = ReflectData.get().getSchema(targetType)
val idSize = 4


val deserializer = new AbstractKafkaAvroDeserializer with Deserializer[A] {
    def configure(configs: util.Map[String, _], isKey: Boolean): Unit =
      this.configure(new KafkaAvroDeserializerConfig(configs))

  def deserialize(topic: String, data: Array[Byte]): A = {
      val bytes = ByteBuffer.wrap(data)
      bytes.get() // skip magic byte
      val schemaId = bytes.getInt()
      val writerSchema = schemaRegistry.getById(schemaId)
      val length = bytes.limit() - 1 - idSize
      val reader = new ReflectDatumReader[A](writerSchema, readerSchema)
      val decoder = DecoderFactory.get().binaryDecoder(bytes.array(), bytes.position(), length, null)
      reader.read(null.asInstanceOf[A], decoder)
    }

  def close(): Unit = {}
}
val props = Map("schema.registry.url" -> schemaRegistryUrl)
deserializer.configure(props.asJava, false)
deserializer

}

曹振
2023-03-14

对于应用模式,有一个非常通用的编解码器派生解决方案:

https://github.com/danslapman/morphling

它不提供“导入和使用”解决方案,但它确实提供了一种方法,可以为您的协议编写自己的编解码器派生机制,而不会干扰shapeless/magnolia。

此外,如果您需要处理二进制数据,请尝试:

https://github.com/scodec/scodec

它提供了解决此类问题的非常scala方法。

 类似资料:

相关问答

相关文章

相关阅读