有没有办法将 GenericRecord(我刚刚从 Kafka 消息中得到的)反序列化为嵌套 POJO?我实际上正在将其反序列化为 Scala 的案例类,但我意识到这更难。我通过互联网搜索,似乎每个人都在手动进行。您知道任何能够做到这一点的库吗?
我想出了这个:
def valueAvroDeserializer[A](schemaRegistryUrl: String, targetType: Class[A]): Deserializer[A] = {
val readerSchema = ReflectData.get().getSchema(targetType)
val idSize = 4
val deserializer = new AbstractKafkaAvroDeserializer with Deserializer[A] {
def configure(configs: util.Map[String, _], isKey: Boolean): Unit =
this.configure(new KafkaAvroDeserializerConfig(configs))
def deserialize(topic: String, data: Array[Byte]): A = {
val bytes = ByteBuffer.wrap(data)
bytes.get() // skip magic byte
val schemaId = bytes.getInt()
val writerSchema = schemaRegistry.getById(schemaId)
val length = bytes.limit() - 1 - idSize
val reader = new ReflectDatumReader[A](writerSchema, readerSchema)
val decoder = DecoderFactory.get().binaryDecoder(bytes.array(), bytes.position(), length, null)
reader.read(null.asInstanceOf[A], decoder)
}
def close(): Unit = {}
}
val props = Map("schema.registry.url" -> schemaRegistryUrl)
deserializer.configure(props.asJava, false)
deserializer
}
对于应用模式,有一个非常通用的编解码器派生解决方案:
https://github.com/danslapman/morphling
它不提供“导入和使用”解决方案,但它确实提供了一种方法,可以为您的协议编写自己的编解码器派生机制,而不会干扰shapeless/magnolia。
此外,如果您需要处理二进制数据,请尝试:
https://github.com/scodec/scodec
它提供了解决此类问题的非常scala方法。