我一直在尝试将RDD转换为数据帧,然后再转换回来。首先,我有一个类型为(Int,Int)的RDD,称为dataPair。然后,我创建了一个带有列标题的DataFrame对象,使用:
val dataFrame = dataPair.toDF(header(0), header(1))
然后我使用以下方法将其从DataFrame转换回RDD:
val testRDD = dataFrame.rdd
它返回RDD类型org.apache.spark.sql.Row(not(Int, Int))。然后我想使用. toDF将其转换回RDD,但我得到一个错误:
error: value toDF is not a member of org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]
我尝试为testRDD定义类型Data(Int, Int)的Schema,但我得到类型不匹配异常:
error: type mismatch;
found : org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]
required: org.apache.spark.rdd.RDD[Data]
val testRDD: RDD[Data] = dataFrame.rdd
^
我已经输入了
import sqlContext.implicits._
要从行的RDD创建数据帧,通常有两个主要选项:
1)您可以使用toDF()
,它可以通过import sqlContext.implicits._
导入。但是,这种方法仅适用于以下类型的RDD:
(来源:SQLContext.implicits对象的Scaladoc)
最后一个签名实际上意味着它可以用于元组的RDD或case类的RDD(因为元组和case类是scala.Product的子类)。
因此,要将此方法用于RDD行,必须将其映射到RDD行
val df = rdd.map({
case Row(val1: String, ..., valN: Long) => (val1, ..., valN)
}).toDF("col1_name", ..., "colN_name")
或
case class MyClass(val1: String, ..., valN: Long = 0L)
val df = rdd.map({
case Row(val1: String, ..., valN: Long) => MyClass(val1, ..., valN)
}).toDF("col1_name", ..., "colN_name")
这种方法(在我看来)的主要缺点是,您必须在map函数中逐列显式设置结果数据帧的模式。如果您事先不知道模式,也许可以通过编程实现,但那里可能会有点混乱。因此,还有另一种选择:
2) 您可以使用SQLContext对象中提供的createDataFrame(rowRDD:RDD[Row],schema:StructType)。示例:
val df = oldDF.sqlContext.createDataFrame(rdd, oldDF.schema)
请注意,无需显式设置任何模式列。我们重用旧DF的模式,它属于STRtType
类,可以轻松扩展。但是,这种方法有时是不可能的,并且在某些情况下效率可能低于第一种方法。
我希望比以前更清楚。干杯
我需要使用 DataFrame上不可用的方法。所有DataFrame方法都只引用DataFrame结果。那么,如何从数据帧数据中创建RDD呢? 注意:这是对 1.2.0 的更改(在 1.3.0 中)。 更新来自@dpangmao的回答:方法是. rdd。我很想知道(a)它是否是公共的,以及(b)它对性能有何影响。 好吧(a)是和(b)-好吧,您可以在这里看到有显着的perf含义:必须通过调用map
有人能分享一下如何将转换为吗?
我使用的是Apache Spark 1.6.2 我有一个。csv数据,它包含大约800万行,我想把它转换成DataFrame 映射RDD可以很好地工作,但是当涉及到将RDD转换为DataFrame时,Spark引发了一个错误 以下是我的代码: 有超过800万行,但是当我将这些行减到只有<500行时,程序就可以正常工作了 数据很乱,每行中的总列经常不同,这就是为什么我需要首先映射它。但是,我想要的数
我试图将JDBC的ResultSet转换成Spark RDD,并寻找一种有效的方法来使用Spark的并行特性。 以下是我按照这个https://stackoverflow.com/a/32073423/6064131实现的 现在的主要问题是它需要更多的时间,我知道所有数据集都是通过一根针提取的eye.But有没有更好的方法来实现这一点? 有些人可能想知道为什么我没有使用内置功能sqlContext
如何将RDD()转换为Dataframe。我使用将dataframe转换为rdd。处理完后,我想把它放回DataFrame中。我怎么能这么做?
我尝试从Kafka加载数据,这是成功的,但我无法转换为火花RDD, 现在如何读取此流对象???我的意思是将其转换为Spark数据帧并执行一些计算 我尝试转换到dataframe 但是toDf不工作错误:value toDf不是org.apache.spark.rdd.RDD的成员[org.apache.spark.sql.行]