当前位置: 首页 > 知识库问答 >
问题:

如何在数据集中存储自定义对象?

秦琦
2023-03-14

根据Spark数据集介绍:

在我们期待Spark 2.0的时候,我们计划对数据集进行一些令人兴奋的改进,具体来说:。。。自定义编码器–虽然我们目前为各种类型自动生成编码器,但我们希望为自定义对象打开一个API。

并且尝试将自定义类型存储在Dataset中会导致以下错误:

无法找到存储在数据集中的类型的编码器。导入sqlC支持原始类型(Int、String等)和产品类型(case类)ontext.implicits._将在未来的版本中添加对其他类型序列化的支持

或:

JAVAlang.UnsupportedOperationException:找不到…的编码器。。。。

是否有任何现有的解决方法?

请注意,此问题仅作为社区维基答案的切入点存在。请随时更新/改进问题和答案。

共有3个答案

公德明
2023-03-14

您可以使用UDTRegistration,然后使用Case类、元组等。。。所有这些都可以正确使用用户定义的类型!

假设要使用自定义枚举:

trait CustomEnum { def value:String }
case object Foo extends CustomEnum  { val value = "F" }
case object Bar extends CustomEnum  { val value = "B" }
object CustomEnum {
  def fromString(str:String) = Seq(Foo, Bar).find(_.value == str).get
}

像这样注册:

// First define a UDT class for it:
class CustomEnumUDT extends UserDefinedType[CustomEnum] {
  override def sqlType: DataType = org.apache.spark.sql.types.StringType
  override def serialize(obj: CustomEnum): Any = org.apache.spark.unsafe.types.UTF8String.fromString(obj.value)
  // Note that this will be a UTF8String type
  override def deserialize(datum: Any): CustomEnum = CustomEnum.fromString(datum.toString)
  override def userClass: Class[CustomEnum] = classOf[CustomEnum]
}

// Then Register the UDT Class!
// NOTE: you have to put this file into the org.apache.spark package!
UDTRegistration.register(classOf[CustomEnum].getName, classOf[CustomEnumUDT].getName)

然后使用它!

case class UsingCustomEnum(id:Int, en:CustomEnum)

val seq = Seq(
  UsingCustomEnum(1, Foo),
  UsingCustomEnum(2, Bar),
  UsingCustomEnum(3, Foo)
).toDS()
seq.filter(_.en == Foo).show()
println(seq.collect())

假设您想使用多态记录:

trait CustomPoly
case class FooPoly(id:Int) extends CustomPoly
case class BarPoly(value:String, secondValue:Long) extends CustomPoly

。。。使用方法如下:

case class UsingPoly(id:Int, poly:CustomPoly)

Seq(
  UsingPoly(1, new FooPoly(1)),
  UsingPoly(2, new BarPoly("Blah", 123)),
  UsingPoly(3, new FooPoly(1))
).toDS

polySeq.filter(_.poly match {
  case FooPoly(value) => value == 1
  case _ => false
}).show()

您可以编写一个自定义UDT,将所有内容编码为字节(我在这里使用java序列化,但最好插入Spark的Kryo上下文)。

首先定义UDT类:

class CustomPolyUDT extends UserDefinedType[CustomPoly] {
  val kryo = new Kryo()

  override def sqlType: DataType = org.apache.spark.sql.types.BinaryType
  override def serialize(obj: CustomPoly): Any = {
    val bos = new ByteArrayOutputStream()
    val oos = new ObjectOutputStream(bos)
    oos.writeObject(obj)

    bos.toByteArray
  }
  override def deserialize(datum: Any): CustomPoly = {
    val bis = new ByteArrayInputStream(datum.asInstanceOf[Array[Byte]])
    val ois = new ObjectInputStream(bis)
    val obj = ois.readObject()
    obj.asInstanceOf[CustomPoly]
  }

  override def userClass: Class[CustomPoly] = classOf[CustomPoly]
}

然后注册它:

// NOTE: The file you do this in has to be inside of the org.apache.spark package!
UDTRegistration.register(classOf[CustomPoly].getName, classOf[CustomPolyUDT].getName)

那你就用吧!

// As shown above:
case class UsingPoly(id:Int, poly:CustomPoly)

Seq(
  UsingPoly(1, new FooPoly(1)),
  UsingPoly(2, new BarPoly("Blah", 123)),
  UsingPoly(3, new FooPoly(1))
).toDS

polySeq.filter(_.poly match {
  case FooPoly(value) => value == 1
  case _ => false
}).show()
应翰飞
2023-03-14

>

  • 使用通用编码器。

    现在有两个通用编码器可供使用kryojavaSeriize,其中后一个显式描述为:

    效率极低,只能作为最后手段使用。

    假设以下类

    class Bar(i: Int) {
      override def toString = s"bar $i"
      def bar = i
    }
    

    您可以通过添加隐式编码器来使用这些编码器:

    object BarEncoders {
      implicit def barEncoder: org.apache.spark.sql.Encoder[Bar] = 
      org.apache.spark.sql.Encoders.kryo[Bar]
    }
    

    它们可以一起使用如下:

    object Main {
      def main(args: Array[String]) {
        val sc = new SparkContext("local",  "test", new SparkConf())
        val sqlContext = new SQLContext(sc)
        import sqlContext.implicits._
        import BarEncoders._
    
        val ds = Seq(new Bar(1)).toDS
        ds.show
    
        sc.stop()
      }
    }
    

    它将对象存储为二进制列,因此当转换为DataFrame时,您会得到以下模式:

    root
     |-- value: binary (nullable = true)
    

    也可以使用特定字段的kryo编码器对元组进行编码:

    val longBarEncoder = Encoders.tuple(Encoders.scalaLong, Encoders.kryo[Bar])
    
    spark.createDataset(Seq((1L, new Bar(1))))(longBarEncoder)
    // org.apache.spark.sql.Dataset[(Long, Bar)] = [_1: bigint, _2: binary]
    

    请注意,这里我们不依赖隐式编码器,而是显式传递编码器,因此这很可能不适用于toDS方法。

    使用隐式转换:

    在可编码的表示和自定义类之间提供隐式转换,例如:

    object BarConversions {
      implicit def toInt(bar: Bar): Int = bar.bar
      implicit def toBar(i: Int): Bar = new Bar(i)
    }
    
    object Main {
      def main(args: Array[String]) {
        val sc = new SparkContext("local",  "test", new SparkConf())
        val sqlContext = new SQLContext(sc)
        import sqlContext.implicits._
        import BarConversions._
    
        type EncodedBar = Int
    
        val bars: RDD[EncodedBar]  = sc.parallelize(Seq(new Bar(1)))
        val barsDS = bars.toDS
    
        barsDS.show
        barsDS.map(_.bar).show
    
        sc.stop()
      }
    }
    

    相关问题:

    • 如何为选项类型构造函数创建编码器,例如Option[Int]

  • 萧自珍
    2023-03-14

    这个答案仍然有效且信息丰富,尽管自2.2/2.3版本以来情况有所好转,它增加了内置编码器对Set、Seq、Map、Date、Timestamp和bigdecimic的支持。如果您坚持只使用case类和通常的Scala类型来创建类型,那么只使用隐式inSQLImplicits就可以了。

    不幸的是,几乎没有添加任何内容来帮助实现这一点。在编码器中搜索自2.0.0以来的。scala或SQLImplicits。scala发现主要与基元类型有关(以及对case类的一些调整)。所以,首先要说的是:目前对自定义类编码器没有真正好的支持。考虑到我们目前拥有的资源,接下来要介绍的是一些技巧,这些技巧能让我们尽可能地做好工作。作为一个预先免责声明:这不会很好地工作,我会尽最大努力明确所有限制。

    当您想要创建数据集时,Spark“需要一个编码器(将T类型的JVM对象转换为内部Spark SQL表示形式,或从内部Spark SQL表示形式转换为内部Spark SQL表示形式),该编码器通常是通过SparkSession的隐式自动创建的,或者可以通过调用编码器上的静态方法显式创建的(摘自createDataset上的文档)。编码器的形式为编码器,其中编码类型为编码器。第一个建议是添加import spark。隐含的_ (它为您提供了这些隐式编码器),第二个建议是使用这组与编码器相关的函数显式传入隐式编码器。

    常规类没有可用的编码器,因此

    import spark.implicits._
    class MyObj(val i: Int)
    // ...
    val d = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
    

    将给您带来以下隐式相关编译时错误:

    无法找到存储在数据集中的类型的编码器。导入sqlC支持原始类型(Int、String等)和产品类型(case类)ontext.implicits._将在未来的版本中添加对其他类型序列化的支持

    但是,如果您在扩展Products的某个类中包装您刚刚用于获取上述错误的任何类型,则错误会令人困惑地延迟到运行时,因此

    import spark.implicits._
    case class Wrap[T](unwrap: T)
    class MyObj(val i: Int)
    // ...
    val d = spark.createDataset(Seq(Wrap(new MyObj(1)),Wrap(new MyObj(2)),Wrap(new MyObj(3))))
    

    编译很好,但在运行时失败

    JAVAlang.UnsupportedOperationException:未找到MyObj的编码器

    这样做的原因是Spark创建的带有隐含的编码器实际上只在运行时制作(通过scalarelfection)。在这种情况下,Spark在编译时的所有检查都是最外面的类扩展了Products(所有case类都这样做),并且只有在运行时才意识到它仍然不知道如何处理MyObj(如果我尝试制作Dataset[(Int, MyObj)]-Spark等到运行时才呕吐MyObj)。这些是急需修复的核心问题:

    • 一些扩展产品的类编译,尽管在运行时总是崩溃并且
    • 无法为嵌套类型传入自定义编码器(我无法为Spark提供一个仅MyObj的编码器,以便它知道如何编码包装[MyObj](Int, MyObj))。

    大家建议的解决方案是使用kryo编码器。

    import spark.implicits._
    class MyObj(val i: Int)
    implicit val myObjEncoder = org.apache.spark.sql.Encoders.kryo[MyObj]
    // ...
    val d = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
    

    但这很快就会变得相当乏味。尤其是如果您的代码正在处理各种数据集、连接、分组等,那么最终会产生一堆额外的隐式。那么,为什么不直接创建一个隐式函数来自动完成这一切呢?

    import scala.reflect.ClassTag
    implicit def kryoEncoder[A](implicit ct: ClassTag[A]) = 
      org.apache.spark.sql.Encoders.kryo[A](ct)
    

    现在,我似乎可以做任何我想做的事情(下面的示例在spark shell中不起作用,其中spark.implicits是自动导入的)

    class MyObj(val i: Int)
    
    val d1 = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
    val d2 = d1.map(d => (d.i+1,d)).alias("d2") // mapping works fine and ..
    val d3 = d1.map(d => (d.i,  d)).alias("d3") // .. deals with the new type
    val d4 = d2.joinWith(d3, $"d2._1" === $"d3._1") // Boom!
    

    或者差不多。问题是,使用kryo会导致Spark将数据集中的每一行存储为一个简单的二进制对象。对于map、filter、foreach这样的操作就足够了,但是对于join这样的操作,Spark确实需要将它们分隔成列。检查d2或d3的架构,您会看到只有一个二进制列:

    d2.printSchema
    // root
    //  |-- value: binary (nullable = true)
    

    因此,利用Scala中隐式的魔力(更多内容请参见6.26.3重载解析),我可以为自己创建一系列隐式,这些隐式将尽可能好地工作,至少对于元组来说是这样,并且可以很好地与现有隐式配合使用:

    import org.apache.spark.sql.{Encoder,Encoders}
    import scala.reflect.ClassTag
    import spark.implicits._  // we can still take advantage of all the old implicits
    
    implicit def single[A](implicit c: ClassTag[A]): Encoder[A] = Encoders.kryo[A](c)
    
    implicit def tuple2[A1, A2](
      implicit e1: Encoder[A1],
               e2: Encoder[A2]
    ): Encoder[(A1,A2)] = Encoders.tuple[A1,A2](e1, e2)
    
    implicit def tuple3[A1, A2, A3](
      implicit e1: Encoder[A1],
               e2: Encoder[A2],
               e3: Encoder[A3]
    ): Encoder[(A1,A2,A3)] = Encoders.tuple[A1,A2,A3](e1, e2, e3)
    
    // ... you can keep making these
    

    然后,有了这些隐含信息,我就可以使上面的示例起作用,尽管需要对列进行重命名

    class MyObj(val i: Int)
    
    val d1 = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
    val d2 = d1.map(d => (d.i+1,d)).toDF("_1","_2").as[(Int,MyObj)].alias("d2")
    val d3 = d1.map(d => (d.i  ,d)).toDF("_1","_2").as[(Int,MyObj)].alias("d3")
    val d4 = d2.joinWith(d3, $"d2._1" === $"d3._1")
    

    我还没有弄清楚如何获得预期的元组名称(\u 1\u 2,…)默认情况下,不重命名它们-如果其他人想玩这个游戏,这就是名称“value”的引入位置,这也是通常添加元组名称的位置。然而,关键是我现在有了一个很好的结构化模式:

    d4.printSchema
    // root
    //  |-- _1: struct (nullable = false)
    //  |    |-- _1: integer (nullable = true)
    //  |    |-- _2: binary (nullable = true)
    //  |-- _2: struct (nullable = false)
    //  |    |-- _1: integer (nullable = true)
    //  |    |-- _2: binary (nullable = true)
    

    综上所述,此解决方案:

    • 允许我们为元组获取单独的列(这样我们可以再次加入元组,耶!)
    • 我们可以再次依赖隐式(因此无需到处传入kryo)
    • 几乎完全向后兼容导入spark。隐含的_ (涉及一些重命名)
    • 不允许我们加入kyro序列化二进制列,更不用说那些可能有
    • 有一个令人不快的副作用,就是将一些元组列重命名为“value”(如有必要,可以通过转换toDF、指定新列名并转换回数据集来撤消此操作,而架构名称似乎是通过最需要的联接来保留的)

    这个不太令人愉快,也没有好的解决方案。然而,现在我们有了上面的元组解决方案,我有一种预感,来自另一个答案的隐式转换解决方案也会不那么痛苦,因为您可以将更复杂的类转换为元组。然后,在创建数据集后,您可能会使用数据框架方法重命名列。如果一切顺利,这确实是一个改进,因为我现在可以对类的字段执行连接。如果我只使用一个平面二进制kryo序列化器,那是不可能的。

    这里有一个例子,可以做很多事情:我有一个类MyObj,它的字段类型为Int,java。util。UUID和设置[字符串]。第一个自己照顾自己。第二,虽然我可以使用kryo进行序列化,但如果将其存储为字符串(因为UUID通常是我想要加入的对象),那么它会更有用。第三个实际上只属于二进制列。

    class MyObj(val i: Int, val u: java.util.UUID, val s: Set[String])
    
    // alias for the type to convert to and from
    type MyObjEncoded = (Int, String, Set[String])
    
    // implicit conversions
    implicit def toEncoded(o: MyObj): MyObjEncoded = (o.i, o.u.toString, o.s)
    implicit def fromEncoded(e: MyObjEncoded): MyObj =
      new MyObj(e._1, java.util.UUID.fromString(e._2), e._3)
    

    现在,我可以使用以下机制创建具有良好模式的数据集:

    val d = spark.createDataset(Seq[MyObjEncoded](
      new MyObj(1, java.util.UUID.randomUUID, Set("foo")),
      new MyObj(2, java.util.UUID.randomUUID, Set("bar"))
    )).toDF("i","u","s").as[MyObjEncoded]
    

    模式向我显示了具有正确名称的I列,以及前两个我可以连接的内容。

    d.printSchema
    // root
    //  |-- i: integer (nullable = false)
    //  |-- u: string (nullable = true)
    //  |-- s: binary (nullable = true)
    
     类似资料:
    • 我有以下对象需要存储在Cassandra中。我需要使用UDT还是有其他方法来存储对象。我需要最终使用存储库方法从spry-boot应用程序存储它。

    • 你好,我可能监督了一些事情,但事情是这样的 我有一个

    • 我是新来的反应-本地人。所以,我有一个名为任务的自定义组件,我有一些数据存储在Asyncstore中。我已经检查了数据是否确实通过另一个组件/模态存储在Asyncstore中。但是当我试图从Asyncstore获取数据时(请参阅下面的代码),它不会显示任何内容。我的自定义组件显示数据很好,只是无法从Asyncstore获取/显示数据。我是不是做错了什么?请帮帮我。 函数的内部调用:

    • 当我试图在Github上使用自己的指南测试tenstorFlow目标检测API时发生了一个错误我在运行他们指南中提到的测试脚本时遇到了以下错误 python对象检测/构建器/模型构建器测试。py/home/appu/anaconda3/envs/ml/lib/python3。6/importlib/_引导。py:219:RuntimeWarning:compiletime模块“tensorflow

    • 如果你需要提供自定义文件存储 – 一个普遍的例子是在某个远程系统上储存文件 – 你可以通过定义一个自定义的储存类来实现。你需要遵循以下步骤: 1. 你的自定义储存类必须是django.core.files.storage.Storage的子类: from django.core.files.storage import Storage class MyStorage(Storage):

    • 问题内容: 我想将-object 保存到Android存储中的某个位置以快速检索并在其中显示数据。 这可能吗?如果可以,那么SQLite或外部存储适合哪种技术? 问题答案: 例。 并从活动中致电 不要忘记在清单文件中使用write_external_storage权限。