val byteArrayInputStream = new ByteArrayInputStream(msg)
val datumReader:DatumReader[GenericRecord] = new SpecificDatumReader[GenericRecord](messageSchema)
val dataFileReader:DataFileStream[GenericRecord] = new DataFileStream[GenericRecord](byteArrayInputStream, datumReader)
while(dataFileReader.hasNext) {
val userData1: GenericRecord = dataFileReader.next()
userData1.asInstanceOf[org.apache.avro.util.Utf8].toString
}
})
错误:错误:(49,9)找不到存储在数据集中的类型的编码器。导入spark.implicits支持基元类型(Int、String等)和产品类型(case类)。_在以后的版本中将增加对序列化其他类型的支持。.map(msg=>{
每当您试图在结构化流中对dataset进行映射/转换时,都希望它与适当的编码器相关联。
Tor基元数据类型,隐式编码器由Spark提供:
import spark.implicits._
对于其他类型,您需要手动提供。
import scala.reflect.ClassTag
implicit def kryoEncoder[A](implicit ct: ClassTag[A]) = org.apache.spark.sql.Encoders.kryo[A](ct)
问题内容: 我想为我正在执行的映射操作在DataSet中为Row类型编写一个编码器。本质上,我不了解如何编写编码器。 以下是地图操作的示例: 我知道,编码器需要编写如下字符串,而不是字符串: 但是,我不了解编码器中的clsTag(),并且我试图找到一个可以演示类似内容的运行示例(即,用于行类型的编码器) 编辑- 这不是所提问题的副本:尝试将数据框行映射到更新行时出现编码器错误,因为答案涉及在Spa
我正在编写一个netty TCP服务器,它必须根据请求中的值对响应进行编码,例如返回pro buf或JSON。建议的实现方法是什么? 我们的服务器有一个带有解码器、编码器和处理程序的管道。 我考虑使用解码器将正确的编码器添加到管道中,如下所示 这似乎有效,但是否正确?ChannelHandler文档使我想到了这个设计。 可以随时添加或删除通道处理器,因为通道管道是线程安全的。例如,可以在将要交换敏
问题内容: 我正在尝试对复杂的numpy数组进行JSON编码,并且我从astropy找到了一个实用程序(http://astropy.readthedocs.org/en/latest/_modules/astropy/utils/misc.html#JsonCustomEncoder)目的: 这对于复杂的numpy数组非常适用: 作为倾销的收益: 问题是,我无法自动将其读回到复杂的数组中。例如:
问题内容: 我正在尝试使用Avro来读取和写入Kafka的邮件。有没有人有使用Avro二进制编码器对将放入消息队列中的数据进行编码/解码的示例? 我需要的是Avro而不是Kafka。或者,也许我应该考虑其他解决方案?基本上,我试图在空间方面找到一种更有效的JSON解决方案。刚刚提到了Avro,因为它可以比JSON紧凑。 问题答案: 我终于想起要询问Kafka邮件列表,并得到以下答复,效果很好。 是
这是我的java程序:
如果你定义并注册了一个message codec,你可以将任何对象发送到event bus 上。 消息编解码器有一个名称,您在发送或发布该消息时在DeliveryOptions中指定该名称: eventBus.registerCodec(myCodec); DeliveryOptions options = new DeliveryOptions().setCodecName(myCodec.na