当前位置: 首页 > 知识库问答 >
问题:

Spark 2.1:使用toDS()函数将RDD转换为带有自定义列的Dataset

淳于昊然
2023-03-14
Exception in thread "main" java.lang.UnsupportedOperationException: No Encoder found for java.time.LocalDate
- field (class: "java.time.LocalDate", name: "_1")
- root class: "scala.Tuple3"
    at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:602)
    at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.apply(ScalaReflection.scala:596)
    at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.apply(ScalaReflection.scala:587)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
    at scala.collection.immutable.List.foreach(List.scala:381)
    at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
    at scala.collection.immutable.List.flatMap(List.scala:344)
    at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:587)
    at org.apache.spark.sql.catalyst.ScalaReflection$.serializerFor(ScalaReflection.scala:425)
    at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:71)
    at org.apache.spark.sql.Encoders$.product(Encoders.scala:275)
    at org.apache.spark.sql.SQLImplicits.newProductEncoder(SQLImplicits.scala:49)
    at observatory.Extraction$.locationYearlyAverageRecords(Extraction.scala:114)
    at observatory.Extraction$.processExtraction(Extraction.scala:28)
    at observatory.Main$.delayedEndpoint$observatory$Main$1(Main.scala:18)
    at observatory.Main$delayedInit$body.apply(Main.scala:7)
    at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
    at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
    at scala.App$$anonfun$main$1.apply(App.scala:76)
    at scala.App$$anonfun$main$1.apply(App.scala:76)
    at scala.collection.immutable.List.foreach(List.scala:381)
    at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
    at scala.App$class.main(App.scala:76)
    at observatory.Main$.main(Main.scala:7)
    at observatory.Main.main(Main.scala)

我的RDD的结构由三列组成,基于Tuple3,其中签名是:

TemperatureRecord类型=(LocalDate,Location,Double)

字段LocalDate是来自包Java.time.LocalDate的Java对象。
字段Location是一个定制类型,由两个具有以下签名的Double(GPS坐标)组成:

关于我的应用程序/环境的一些详细信息:

  • Scala:2.11.8
  • 火花芯:2.1.1
  • Spark SQL:2.1.1
  • Linux Ubuntu:16.04LTS

我从本文中了解了如何在DataSet中存储自定义对象?我需要定义自定义编码器,但我没有任何想法:(。

共有1个答案

方河
2023-03-14

问题是Spark没有为常规类找到编码器。到目前为止,Spark只允许使用基元类型作为编码器,对自定义类没有很好的支持。

至于您的情况,假定您的“自定义”类表示日期,您可以使用java.sql.date代替java.time.localdate。好处是您可以利用Spark已经提供的编码器。

import java.sql.Date
case class TempRow(date: Date, loc: Location, temp: Double)

val ds = Seq(TempRow(java.sql.Date.valueOf("2017-06-01"), 
 Location(1.4,5.1), 4.9), TempRow(java.sql.Date.valueOf("2014-04-05"),
 Location(1.5,2.5), 5.5))
 .toDS

ds.show()

+----------+---------+----+
|      date|      loc|temp|
+----------+---------+----+
|2017-06-01|[1.4,5.1]| 4.9|
|2014-04-05|[1.5,2.5]| 5.5|
+----------+---------+----+

检查架构:

ds.printSchema()

root
 |-- date: date (nullable = true)
 |-- loc: struct (nullable = true)
 |    |-- i: double (nullable = false)
 |    |-- j: double (nullable = false)
 |-- temp: double (nullable = false)
case class Location(val i: Double, val j: Double)
class TempRecord(val date: java.time.LocalDate, val loc: Location, val temp: Double)
type TempSerialized = (String, Location, Double)

implicit def fromSerialized(t: TempSerialized): TempRecord = new TempRecord(java.time.LocalDate.parse(t._1), t._2, t._3)
implicit def toSerialized(t: TempRecord): TempSerialized = (t.date.toString, t.loc, t.temp)

// Finally we can create datasets
val d = spark.createDataset(Seq[TempSerialized](
  new TempRecord(java.time.LocalDate.now, Location(1.0,2.0), 3.0), 
  new TempRecord(java.time.LocalDate.now, Location(5.0,4.0), 4.0) )
).toDF("date", "location", "temperature").as[TempSerialized]

d.show()

+----------+---------+-----------+
|      date| location|temperature|
+----------+---------+-----------+
|2017-07-11|[1.0,2.0]|        3.0|
|2017-07-11|[5.0,4.0]|        4.0|
+----------+---------+-----------+

d.printSchema()

root
 |-- date: string (nullable = true)
 |-- location: struct (nullable = true)
 |    |-- i: double (nullable = false)
 |    |-- j: double (nullable = false)
 |-- temperature: double (nullable = false)
 类似资料:
  • 类似于将JsonNode转换为POJO和将JsonNode转换为java array,但无法找到问题的确切解决方案。 以下是我的POJOs声明: 所以在我的测试中,我想得到一个建筑列表,并将json列表转换/绑定到一个真实对象建筑列表。 以下是我想做的: 但是,我得到以下错误: 显然,如果我在Building类中删除了构造函数,并为字段类型添加了setter,它就会起作用。但是如果我确实有一个要求

  • 当我尝试通过创建以下包时: 我得到以下错误: 值toDF不是组织的成员。阿帕奇。火花rdd。RDD[日志] 我尝试了以下几种方法: 托德弗洛格 我就是不能编译我的代码。 欢迎任何线索来克服这个错误。 我很好地阅读了从case类生成Spark StructType/Schema,并编写了: 但这样做时,我不使用Log类。我想知道是否有一种方法可以通过使用定义的日志类获取数据帧,或者官方/最佳方法是使

  • 我对Spark和Scala相对较新。 我从以下数据帧开始(由密集的双倍向量组成的单列): 直接转换为RDD将生成一个org实例。阿帕奇。火花rdd。RDD[org.apache.spark.sql.Row]: 有人知道如何将此DF转换为org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.向量]的实例吗?到目前为止,我的各种尝试都没有成功。

  • 我使用的是Apache Spark 1.6.2 我有一个。csv数据,它包含大约800万行,我想把它转换成DataFrame 映射RDD可以很好地工作,但是当涉及到将RDD转换为DataFrame时,Spark引发了一个错误 以下是我的代码: 有超过800万行,但是当我将这些行减到只有<500行时,程序就可以正常工作了 数据很乱,每行中的总列经常不同,这就是为什么我需要首先映射它。但是,我想要的数

  • 我有一个使用框架的应用程序,该框架通过框架中的XML文件提供某些SpringBean。我的应用程序的配置目前部分使用XML完成,但大部分使用Spring注释。 一些XMLBean定义的父项引用框架提供的bean,例如。 FramwworkBean在框架中的XML文件中定义。有一个豆子传承的链条。在每个步骤中,一些条目被添加到上下文中: 我知道这一切的结果是构造一个ClassWithContext实

  • 大家好,我正在尝试使用JAXB marshaller将JAVA对象转换为xml。我使用一个自定义异常处理程序,因为我想摆脱jaxb转换一些特殊字符,例如 这是我的转义字符处理程序类。公共类MepCharacterEscapeHandler实现com。太阳xml。内部的绑定马歇尔。角色逃脱者{ 然而,我得到一个例外如下 谢谢你的帮助我尝试了所有的方法但都没用谢谢