import org.apache.spark.sql.SaveMode
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.GenericMutableRow
import org.apache.spark.sql.types._
@SQLUserDefinedType(udt = classOf[AUDT])
case class A(list:Seq[B])
class AUDT extends UserDefinedType[A] {
override def sqlType: DataType = StructType(Seq(StructField("list", ArrayType(BUDT, containsNull = false), nullable = true)))
override def userClass: Class[A] = classOf[A]
override def serialize(obj: Any): Any = obj match {
case A(list) =>
val row = new GenericMutableRow(1)
row.update(0, new GenericArrayData(list.map(_.asInstanceOf[Any]).toArray))
row
}
override def deserialize(datum: Any): A = {
datum match {
case row: InternalRow => new A(row.getArray(0).toArray(BUDT).toSeq)
}
}
}
object AUDT extends AUDT
@SQLUserDefinedType(udt = classOf[BUDT])
case class B(num:Int)
class BUDT extends UserDefinedType[B] {
override def sqlType: DataType = StructType(Seq(StructField("num", IntegerType, nullable = false)))
override def userClass: Class[B] = classOf[B]
override def serialize(obj: Any): Any = obj match {
case B(num) =>
val row = new GenericMutableRow(1)
row.setInt(0, num)
row
}
override def deserialize(datum: Any): B = {
datum match {
case row: InternalRow => new B(row.getInt(0))
}
}
}
object BUDT extends BUDT
object TestNested {
def main(args:Array[String]) = {
val col = Seq(new A(Seq(new B(1), new B(2))),
new A(Seq(new B(3), new B(4))))
val sc = new SparkContext(new SparkConf().setMaster("local[1]").setAppName("TestSpark"))
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
val df = sc.parallelize(1 to 2 zip col).toDF()
df.show()
df.write.mode(SaveMode.Overwrite).save(...)
}
}
15/09/16 16:44:39 ERROR Executor:stage 1.0(TID 1)中任务0.0中的异常java.lang.IllegalArgumentException:应重复嵌套类型:required group array{required int32 num;}在org.apache.parquet.schema.conversionPatterns.listWrapper(conversionPatterns.java:42),在org.apache.parquet.schema.conversionPatterns.listType(conversionPatterns.java:97),在在org.apache.spark.sql.execution.datasources.parquet.catalystschemaconverter$$anonfun$convertfield$1.在org.apache.spark.sql.execution.datasources.parquet.catalystschemaconverter$$scaltschemaconverter$$scaltschemaconverter$$scaltschemaconverter$$anonfun$convertfield$1.在科莱克蒂在scala.collection.mutable.arrayops$ofref.foldleft(arrayops.scala:108)在org.apache.spark.sql.execution.datasources.parquet.catalystschemaconverter.convertfield(catalystschemaconverter.scala:521)在converter.convertfield(catalystschemaconverter.scala:526)在org.apache.spark.sql.execution.datasources.parquet.catalystschemaconverter.convertfield(catalystschemaconverter.scala:318)在org.apache.spark.execution.sql.execution.datasources.parquet.catalystschemaconverter$$anonfun$convert$1.在a.collection.traversablelike$$anonfun$map$1.apply(traversablelike.scala:244)在scala.collection.traversablelike$$anonfun$map$1.在scala.collection.iterator$class.foreach(iterator.scala:244)在scala.collection.abstractiterator.foreach(iterator.scala:727)在scala.collection.abstraction.foreach(iterator.scala:1157)在scala.collection.iterablelike$class.foreach(iterablelike.scala:72)在structType.map(structType.scala:92)在org.apache.spark.sql.execution.datasources.parquet.catalystschemaconverter.convert(catalystschemaconverter.scala:311)在org.apache.spark.sql.execution.datasources.parquet.parquet.tatasources.parquet.parquettypesconverter$.convertfromattributes(parquettypesconvert)在getRecordWriter(ParquetOutputFormat.java:288)位于org.apache.parquet.hadoop.parquetOutputFormat.getorg.apache.spark.sql.execution.datasources.parquet.parquetOutputWriter的RecordWriter(ParquetRelation.scala:94)在org.apache.spark.sql.execution.datasources.parquet.parquetOutputWriter.(ParquetRelation.scala:94)在org.apache.spark.sql.execution.datasources.parquet.parquetRelations$$Anon$3.newStance(relation$$anonfun$run$1$$anonfun$apply$mcv$sp$3.在org.apache.spark.sql.execution.datasources.insertintohadoopfsrelations$$anonfun$run$1$$anonfun$apply$mcv$sp$3.在org.apache.spark.spark.executor.execution应用(insertintohadoopfsrelation.scala:150)在org.apache.spark.spark.execution tor$taskrunner.run(executor.scala:214)在java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor.java:1145)在java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor.java:615)位于java.lang.thread.run(thread.java:745)15/09/16 16:44:39 WARN tasksetmanager:在stage 1.0(TID 1,localhost)中丢失任务0.0:
如果a用B而不是a保存dataframe,则没有问题,因为B不是嵌套的自定义类。我是不是漏掉了什么?
我必须对代码进行四次更改才能使其正常工作(在Linux上的Spark1.6.0中测试),我想我可以解释为什么需要这些更改。然而,我确实发现自己在想,是否有更简单的解决方案。所有更改都在audt
中,如下所示:
sqltype
时,应使其依赖于budt.sqltype
,而不仅仅是budt
。serialize()
中,对每个列表元素调用budt.serialize()
。deserialize()
中的
toArray(BUDT.sqltype)
而不是toArray(BUDT)
budt.deserialize()
以下是结果代码:
class AUDT extends UserDefinedType[A] {
override def sqlType: DataType =
StructType(
Seq(StructField("list",
ArrayType(BUDT.sqlType, containsNull = false),
nullable = true)))
override def userClass: Class[A] = classOf[A]
override def serialize(obj: Any): Any =
obj match {
case A(list) =>
val row = new GenericMutableRow(1)
val elements =
list.map(_.asInstanceOf[Any])
.map(e => BUDT.serialize(e))
.toArray
row.update(0, new GenericArrayData(elements))
row
}
override def deserialize(datum: Any): A = {
datum match {
case row: InternalRow =>
val first = row.getArray(0)
val bs:Array[InternalRow] = first.toArray(BUDT.sqlType)
val bseq = bs.toSeq.map(e => BUDT.deserialize(e))
val a = new A(bseq)
a
}
}
}
好吧,我一直在Kotlin试验原始数据存储,我有一个问题。我正在使用以下内容。原型文件: 这是我的序列化程序类: 和我的存储库: 因此,在我的updateValue()方法中,我可以设置“name”字段的名称,但我没有地址消息字段的setter,如street和number。编译器只显示getter。在姓名和年龄字段的另一边,我有setters。如何对这两个地址字段使用setters:街道、编号?
问题内容: 这个问题令我震惊。我已经创建了嵌套JSON一个POJO,我在获取数据对象,其中是一个具有两个元素。 这是MarketPrice POJO类,实际上我需要将其保存到表中。即,整个JSON对象。但是我有两个实体。这怎么可能? MarketPrice.java Items.java 这是我从控制器的服务器端获取的嵌套JSON数据: 中的JSON数据 Controller.java DAO.j
如何在PySpark中更改嵌套列的datatype?对于rxample,如何将value的数据类型从string更改为int? 参考:如何在pyspark中将Dataframe列从String类型更改为Double类型
本页包含内容: 嵌套类型实例 嵌套类型的引用 枚举类型常被用于实现特定类或结构体的功能。也能够在有多种变量类型的环境中,方便地定义通用类或结构体来使用,为了实现这种功能,Swift允许你定义嵌套类型,可以在枚举类型、类和结构体中定义支持嵌套的类型。 要在一个类型中嵌套另一个类型,将需要嵌套的类型的定义写在被嵌套类型的区域{}内,而且可以根据需要定义多级嵌套。 嵌套类型实例 下面这个例子定义了一个结
问题内容: 我正在开发一个使用Postgres 数据类型的Rails应用程序。我在名为的表中有一个JSON列。假设我有多个这样的条目: 我想做的是返回具有相同唱片集,src和背景的条目的不同组合(注意:在节点内,数组元素的顺序无关紧要)。例如,查询应将条目1,3作为一组进行匹配,将条目2与另一组进行匹配,依此类推。目标是找到前3个最常见的组合。我知道如何使用Ruby来执行此操作,但是我必须查询大量
我想保存学生和教师的信息使用单一的邮政API。如果我选择“学生”类型,数据将保存在学生表中。如果我选择“教师”类型,数据将保存在教师表中。但是该操作将由一个API调用完成。