当前位置: 首页 > 知识库问答 >
问题:

如何将RDD[行]转换回DataFrame[副本]

松嘉颖
2023-03-14

我一直在尝试将RDD转换为数据帧,然后再转换回来。首先,我有一个类型为(Int,Int)的RDD,称为dataPair。然后,我创建了一个带有列标题的DataFrame对象,使用:

val dataFrame = dataPair.toDF(header(0), header(1))

然后我使用以下方法将其从DataFrame转换回RDD:

val testRDD = dataFrame.rdd

它返回RDD类型org.apache.spark.sql.Row(not(Int, Int))。然后我想使用. toDF将其转换回RDD,但我得到一个错误:

error: value toDF is not a member of org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]

我尝试为testRDD定义类型Data(Int, Int)的Schema,但我得到类型不匹配异常:

error: type mismatch;
found   : org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]
required: org.apache.spark.rdd.RDD[Data]
    val testRDD: RDD[Data] = dataFrame.rdd
                                       ^

我已经输入了

import sqlContext.implicits._

共有1个答案

谭京
2023-03-14

要从行的RDD创建数据帧,通常有两个主要选项:

1)您可以使用toDF(),它可以通过import sqlContext.implicits._导入。但是,这种方法仅适用于以下类型的RDD:

(来源:SQLContext.implicits对象的Scaladoc)

最后一个签名实际上意味着它可以用于元组的RDD或case类的RDD(因为元组和case类是scala.Product的子类)。

因此,要将此方法用于RDD行,必须将其映射到RDD行

val df = rdd.map({ 
  case Row(val1: String, ..., valN: Long) => (val1, ..., valN)
}).toDF("col1_name", ..., "colN_name")

case class MyClass(val1: String, ..., valN: Long = 0L)
val df = rdd.map({ 
  case Row(val1: String, ..., valN: Long) => MyClass(val1, ..., valN)
}).toDF("col1_name", ..., "colN_name")

这种方法(在我看来)的主要缺点是,您必须在map函数中逐列显式设置结果数据帧的模式。如果您事先不知道模式,也许可以通过编程实现,但那里可能会有点混乱。因此,还有另一种选择:

2) 您可以使用SQLContext对象中提供的createDataFrame(rowRDD:RDD[Row],schema:StructType)。示例:

val df = oldDF.sqlContext.createDataFrame(rdd, oldDF.schema)

请注意,无需显式设置任何模式列。我们重用旧DF的模式,它属于STRtType类,可以轻松扩展。但是,这种方法有时是不可能的,并且在某些情况下效率可能低于第一种方法。

我希望比以前更清楚。干杯

 类似资料:
  • 我需要使用 DataFrame上不可用的方法。所有DataFrame方法都只引用DataFrame结果。那么,如何从数据帧数据中创建RDD呢? 注意:这是对 1.2.0 的更改(在 1.3.0 中)。 更新来自@dpangmao的回答:方法是. rdd。我很想知道(a)它是否是公共的,以及(b)它对性能有何影响。 好吧(a)是和(b)-好吧,您可以在这里看到有显着的perf含义:必须通过调用map

  • 有人能分享一下如何将转换为吗?

  • 我使用的是Apache Spark 1.6.2 我有一个。csv数据,它包含大约800万行,我想把它转换成DataFrame 映射RDD可以很好地工作,但是当涉及到将RDD转换为DataFrame时,Spark引发了一个错误 以下是我的代码: 有超过800万行,但是当我将这些行减到只有<500行时,程序就可以正常工作了 数据很乱,每行中的总列经常不同,这就是为什么我需要首先映射它。但是,我想要的数

  • 我试图将JDBC的ResultSet转换成Spark RDD,并寻找一种有效的方法来使用Spark的并行特性。 以下是我按照这个https://stackoverflow.com/a/32073423/6064131实现的 现在的主要问题是它需要更多的时间,我知道所有数据集都是通过一根针提取的eye.But有没有更好的方法来实现这一点? 有些人可能想知道为什么我没有使用内置功能sqlContext

  • 如何将RDD()转换为Dataframe。我使用将dataframe转换为rdd。处理完后,我想把它放回DataFrame中。我怎么能这么做?

  • 我尝试从Kafka加载数据,这是成功的,但我无法转换为火花RDD, 现在如何读取此流对象???我的意思是将其转换为Spark数据帧并执行一些计算 我尝试转换到dataframe 但是toDf不工作错误:value toDf不是org.apache.spark.rdd.RDD的成员[org.apache.spark.sql.行]