当前位置: 首页 > 知识库问答 >
问题:

在Spark/Scala中将RDD转换为数据帧

欧桐
2023-03-14

RDD是以数组[数组[字符串]的格式创建的,具有以下值:

val rdd : Array[Array[String]] = Array(
Array("4580056797", "0", "2015-07-29 10:38:42", "0", "1", "1"), 
Array("4580056797", "0", "2015-07-29 10:38:43", "0", "1", "1"))

我想用模式创建一个数据帧:

val schemaString = "callId oCallId callTime duration calltype swId"

接下来的步骤:

scala> val rowRDD = rdd.map(p => Array(p(0), p(1), p(2),p(3),p(4),p(5).trim))
rowRDD: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[14] at map at <console>:39
scala> val calDF = sqlContext.createDataFrame(rowRDD, schema)

给出以下错误:

console:45: error: overloaded method value createDataFrame with alternatives:
     (rdd: org.apache.spark.api.java.JavaRDD[_],beanClass: Class[_])org.apache.spark.sql.DataFrame <and>
    (rdd: org.apache.spark.rdd.RDD[_],beanClass: Class[_])org.apache.spark.sql.DataFrame <and>
    (rowRDD: org.apache.spark.api.java.JavaRDD[org.apache.spark.sql.Row],schema: org.apache.spark.sql.types.StructType)org.apache.spark.sql.DataFrame <and>
    (rowRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row],schema: org.apache.spark.sql.types.StructType)org.apache.spark.sql.DataFrame
    cannot be applied to (org.apache.spark.rdd.RDD[Array[String]],   
    org.apache.spark.sql.types.StructType)
       val calDF = sqlContext.createDataFrame(rowRDD, schema)

共有3个答案

暴德运
2023-03-14

使用spark 1.6.1和scala 2.10

我得到了相同的错误:重载方法值createDataFrame,带有可选项:

对我来说,gotcha是createDataFrame中的签名,我试图使用val-rdd:List[Row],但它失败了,因为java。util。列出[org.apache.spark.sql.Row]和scala。收集不变的List[org.apache.spark.sql.Row]不一样。

我找到的工作解决方案是通过列表[Array[String]将val-rdd:Array[Array[String]]转换为rdd[Row]。我发现这与文档中的内容最为接近

import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructType,StructField,StringType};
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

val rdd_original : Array[Array[String]] = Array(
    Array("4580056797", "0", "2015-07-29 10:38:42", "0", "1", "1"), 
    Array("4580056797", "0", "2015-07-29 10:38:42", "0", "1", "1"))

val rdd : List[Array[String]] = rdd_original.toList

val schemaString = "callId oCallId callTime duration calltype swId"

// Generate the schema based on the string of schema
val schema =
  StructType(
    schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))

// Convert records of the RDD to Rows.
val rowRDD = rdd.map(p => Row(p: _*)) // using splat is easier
// val rowRDD = rdd.map(p => Row(p(0), p(1), p(2), p(3), p(4), p(5))) // this also works

val df = sqlContext.createDataFrame(sc.parallelize(rowRDD:List[Row]), schema)
df.show

岳亮
2023-03-14

您需要首先将数组转换为行,然后定义模式。我假设您的大多数字段都是长的

    val rdd: RDD[Array[String]] = ???
    val rows: RDD[Row] = rdd map {
      case Array(callId, oCallId, callTime, duration, swId) =>
        Row(callId.toLong, oCallId.toLong, callTime, duration.toLong, swId.toLong)
    }

    object schema {
      val callId = StructField("callId", LongType)
      val oCallId = StructField("oCallId", StringType)
      val callTime = StructField("callTime", StringType)
      val duration = StructField("duration", LongType)
      val swId = StructField("swId", LongType)

      val struct = StructType(Array(callId, oCallId, callTime, duration, swId))
    }

    sqlContext.createDataFrame(rows, schema.struct)
师承弼
2023-03-14

只需粘贴到火花壳中:

val a = 
  Array(
    Array("4580056797", "0", "2015-07-29 10:38:42", "0", "1", "1"), 
    Array("4580056797", "0", "2015-07-29 10:38:42", "0", "1", "1"))

val rdd = sc.makeRDD(a)

case class X(callId: String, oCallId: String, 
  callTime: String, duration: String, calltype: String, swId: String)

然后在RDD上映射()以创建case类的实例,然后使用toDF()创建数据帧:

scala> val df = rdd.map { 
  case Array(s0, s1, s2, s3, s4, s5) => X(s0, s1, s2, s3, s4, s5) }.toDF()
df: org.apache.spark.sql.DataFrame = 
  [callId: string, oCallId: string, callTime: string, 
    duration: string, calltype: string, swId: string]

这从case类推断出模式。

然后您可以继续:

scala> df.printSchema()
root
 |-- callId: string (nullable = true)
 |-- oCallId: string (nullable = true)
 |-- callTime: string (nullable = true)
 |-- duration: string (nullable = true)
 |-- calltype: string (nullable = true)
 |-- swId: string (nullable = true)

scala> df.show()
+----------+-------+-------------------+--------+--------+----+
|    callId|oCallId|           callTime|duration|calltype|swId|
+----------+-------+-------------------+--------+--------+----+
|4580056797|      0|2015-07-29 10:38:42|       0|       1|   1|
|4580056797|      0|2015-07-29 10:38:42|       0|       1|   1|
+----------+-------+-------------------+--------+--------+----+

如果您想在普通程序中(不在火花壳中)使用toDF(),请确保(此处引用):

  • 导入sqlContext。隐含_ 使用toDF()在方法外定义case类
 类似资料:
  • 我有一个如下的CSV文件。 我想把这个转化成下面。 基本上,我想在输出数据帧中创建一个名为idx的新列,该列将填充与键=idx,value=“n”后面的行相同的值“n”。

  • 我用Avro(序列化器和反序列化器)收到Kafka主题的推文。然后,我创建了一个spark consumer,它在RDD[GenericRecord]的数据流中提取推文。现在,我想将每个rdd转换为数据帧,通过SQL分析这些推文。有什么解决方案可以将RDD[GenericRecord]转换为数据帧吗?

  • 我正在尝试将RDD[String]转换为数据框。字符串是逗号分隔的,所以我希望逗号之间的每个值都有一列。为此,我尝试了以下步骤: 但我明白了: 这不是这篇文章的副本(如何将rdd对象转换为火花中的数据帧),因为我要求RDD[字符串]而不是RDD[行]。 而且它也不是火花加载CSV文件作为DataFrame的副本?因为这个问题不是关于将CSV文件读取为DataFrame。

  • 有人能分享一下如何将转换为吗?

  • 我在S3有一些遗留数据,我想使用Spark 2和Java API将它们转换成parquet格式。 我有所需的Avro模式(. avsc文件)及其使用Avro编译器生成的Java类,我想使用这些模式以Parque格式存储数据。输入数据不是任何标准格式,但我有一个库,可以将遗留文件中的每一行转换为Avro类。 是否可以将数据作为

  • 在从< code>RDD制作< code >数据帧时,我遇到了一个错误。 我收到以下错误: py spark . SQL . utils . parse exception:u " \ nmis matched input ' '应为{'SELECT ',' FROM ',' ADD ',' AS ',' ALL ',' DISTINCT ',' WHERE ',' GROUP ',' BY ',