当前位置: 首页 > 知识库问答 >
问题:

用嵌套的用户数据类型保存Spark DataFrames

章鸿光
2023-03-14
import org.apache.spark.sql.SaveMode
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.GenericMutableRow
import org.apache.spark.sql.types._

@SQLUserDefinedType(udt = classOf[AUDT])
case class A(list:Seq[B])

class AUDT extends UserDefinedType[A] {
  override def sqlType: DataType = StructType(Seq(StructField("list", ArrayType(BUDT, containsNull = false), nullable = true)))
  override def userClass: Class[A] = classOf[A]
  override def serialize(obj: Any): Any = obj match {
    case A(list) =>
      val row = new GenericMutableRow(1)
      row.update(0, new GenericArrayData(list.map(_.asInstanceOf[Any]).toArray))
      row
  }

  override def deserialize(datum: Any): A = {
    datum match {
      case row: InternalRow => new A(row.getArray(0).toArray(BUDT).toSeq)
    }
  }
}

object AUDT extends AUDT

@SQLUserDefinedType(udt = classOf[BUDT])
case class B(num:Int)

class BUDT extends UserDefinedType[B] {
  override def sqlType: DataType = StructType(Seq(StructField("num", IntegerType, nullable = false)))
  override def userClass: Class[B] = classOf[B]
  override def serialize(obj: Any): Any = obj match {
    case B(num) =>
      val row = new GenericMutableRow(1)
      row.setInt(0, num)
      row
  }

  override def deserialize(datum: Any): B = {
    datum match {
      case row: InternalRow => new B(row.getInt(0))
    }
  }
}

object BUDT extends BUDT

object TestNested {
  def main(args:Array[String]) = {
    val col = Seq(new A(Seq(new B(1), new B(2))),
                  new A(Seq(new B(3), new B(4))))

    val sc = new SparkContext(new SparkConf().setMaster("local[1]").setAppName("TestSpark"))
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    import sqlContext.implicits._

    val df = sc.parallelize(1 to 2 zip col).toDF()
    df.show()

    df.write.mode(SaveMode.Overwrite).save(...)
  }
}

15/09/16 16:44:39 ERROR Executor:stage 1.0(TID 1)中任务0.0中的异常java.lang.IllegalArgumentException:应重复嵌套类型:required group array{required int32 num;}在org.apache.parquet.schema.conversionPatterns.listWrapper(conversionPatterns.java:42),在org.apache.parquet.schema.conversionPatterns.listType(conversionPatterns.java:97),在在org.apache.spark.sql.execution.datasources.parquet.catalystschemaconverter$$anonfun$convertfield$1.在org.apache.spark.sql.execution.datasources.parquet.catalystschemaconverter$$scaltschemaconverter$$scaltschemaconverter$$scaltschemaconverter$$anonfun$convertfield$1.在科莱克蒂在scala.collection.mutable.arrayops$ofref.foldleft(arrayops.scala:108)在org.apache.spark.sql.execution.datasources.parquet.catalystschemaconverter.convertfield(catalystschemaconverter.scala:521)在converter.convertfield(catalystschemaconverter.scala:526)在org.apache.spark.sql.execution.datasources.parquet.catalystschemaconverter.convertfield(catalystschemaconverter.scala:318)在org.apache.spark.execution.sql.execution.datasources.parquet.catalystschemaconverter$$anonfun$convert$1.在a.collection.traversablelike$$anonfun$map$1.apply(traversablelike.scala:244)在scala.collection.traversablelike$$anonfun$map$1.在scala.collection.iterator$class.foreach(iterator.scala:244)在scala.collection.abstractiterator.foreach(iterator.scala:727)在scala.collection.abstraction.foreach(iterator.scala:1157)在scala.collection.iterablelike$class.foreach(iterablelike.scala:72)在structType.map(structType.scala:92)在org.apache.spark.sql.execution.datasources.parquet.catalystschemaconverter.convert(catalystschemaconverter.scala:311)在org.apache.spark.sql.execution.datasources.parquet.parquet.tatasources.parquet.parquettypesconverter$.convertfromattributes(parquettypesconvert)在getRecordWriter(ParquetOutputFormat.java:288)位于org.apache.parquet.hadoop.parquetOutputFormat.getorg.apache.spark.sql.execution.datasources.parquet.parquetOutputWriter的RecordWriter(ParquetRelation.scala:94)在org.apache.spark.sql.execution.datasources.parquet.parquetOutputWriter.(ParquetRelation.scala:94)在org.apache.spark.sql.execution.datasources.parquet.parquetRelations$$Anon$3.newStance(relation$$anonfun$run$1$$anonfun$apply$mcv$sp$3.在org.apache.spark.sql.execution.datasources.insertintohadoopfsrelations$$anonfun$run$1$$anonfun$apply$mcv$sp$3.在org.apache.spark.spark.executor.execution应用(insertintohadoopfsrelation.scala:150)在org.apache.spark.spark.execution tor$taskrunner.run(executor.scala:214)在java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor.java:1145)在java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor.java:615)位于java.lang.thread.run(thread.java:745)15/09/16 16:44:39 WARN tasksetmanager:在stage 1.0(TID 1,localhost)中丢失任务0.0:

如果a用B而不是a保存dataframe,则没有问题,因为B不是嵌套的自定义类。我是不是漏掉了什么?

共有1个答案

冷涵忍
2023-03-14

我必须对代码进行四次更改才能使其正常工作(在Linux上的Spark1.6.0中测试),我想我可以解释为什么需要这些更改。然而,我确实发现自己在想,是否有更简单的解决方案。所有更改都在audt中,如下所示:

  • 定义sqltype时,应使其依赖于budt.sqltype,而不仅仅是budt
  • serialize()中,对每个列表元素调用budt.serialize()
  • deserialize()中的
    • 调用toArray(BUDT.sqltype)而不是toArray(BUDT)
    • 对每个元素调用budt.deserialize()

    以下是结果代码:

    class AUDT extends UserDefinedType[A] {
      override def sqlType: DataType =
        StructType(
          Seq(StructField("list",
                          ArrayType(BUDT.sqlType, containsNull = false),
                          nullable = true)))
    
      override def userClass: Class[A] = classOf[A]
    
      override def serialize(obj: Any): Any = 
        obj match {
          case A(list) =>
            val row = new GenericMutableRow(1)
            val elements =
              list.map(_.asInstanceOf[Any])
                  .map(e => BUDT.serialize(e))
                  .toArray
            row.update(0, new GenericArrayData(elements))
            row
        }
    
      override def deserialize(datum: Any): A = {
        datum match {
          case row: InternalRow => 
            val first = row.getArray(0)
            val bs:Array[InternalRow] = first.toArray(BUDT.sqlType)
            val bseq = bs.toSeq.map(e => BUDT.deserialize(e))
            val a = new A(bseq)
            a
        }
      }
    
    }
    

 类似资料:
  • 好吧,我一直在Kotlin试验原始数据存储,我有一个问题。我正在使用以下内容。原型文件: 这是我的序列化程序类: 和我的存储库: 因此,在我的updateValue()方法中,我可以设置“name”字段的名称,但我没有地址消息字段的setter,如street和number。编译器只显示getter。在姓名和年龄字段的另一边,我有setters。如何对这两个地址字段使用setters:街道、编号?

  • 问题内容: 这个问题令我震惊。我已经创建了嵌套JSON一个POJO,我在获取数据对象,其中是一个具有两个元素。 这是MarketPrice POJO类,实际上我需要将其保存到表中。即,整个JSON对象。但是我有两个实体。这怎么可能? MarketPrice.java Items.java 这是我从控制器的服务器端获取的嵌套JSON数据: 中的JSON数据 Controller.java DAO.j

  • 如何在PySpark中更改嵌套列的datatype?对于rxample,如何将value的数据类型从string更改为int? 参考:如何在pyspark中将Dataframe列从String类型更改为Double类型

  • 本页包含内容: 嵌套类型实例 嵌套类型的引用 枚举类型常被用于实现特定类或结构体的功能。也能够在有多种变量类型的环境中,方便地定义通用类或结构体来使用,为了实现这种功能,Swift允许你定义嵌套类型,可以在枚举类型、类和结构体中定义支持嵌套的类型。 要在一个类型中嵌套另一个类型,将需要嵌套的类型的定义写在被嵌套类型的区域{}内,而且可以根据需要定义多级嵌套。 嵌套类型实例 下面这个例子定义了一个结

  • 问题内容: 我正在开发一个使用Postgres 数据类型的Rails应用程序。我在名为的表中有一个JSON列。假设我有多个这样的条目: 我想做的是返回具有相同唱片集,src和背景的条目的不同组合(注意:在节点内,数组元素的顺序无关紧要)。例如,查询应将条目1,3作为一组进行匹配,将条目2与另一组进行匹配,依此类推。目标是找到前3个最常见的组合。我知道如何使用Ruby来执行此操作,但是我必须查询大量

  • 我想保存学生和教师的信息使用单一的邮政API。如果我选择“学生”类型,数据将保存在学生表中。如果我选择“教师”类型,数据将保存在教师表中。但是该操作将由一个API调用完成。