当前位置: 首页 > 知识库问答 >
问题:

当第一行是模式时,如何从Spark中的csv(使用scala)创建数据帧?

潘彦
2023-03-14

我是Spark的新手,我正在使用scala编程。我想从HDFS或S3中读取一个文件,并将其转换为Spark数据帧。Csv文件的第一行是模式。但是,如何创建具有未知列的模式的数据框架呢?我使用下面的代码为一个已知的模式创建数据框架。

def loadData(path:String): DataFrame = {

  val rdd = sc.textFile(path);
  val firstLine = rdd.first();
  val schema = StructType(firstLine.split(',').map(fieldName=>StructField(fieldName,StringType,true)));

  val noHeader = rdd.mapPartitionsWithIndex( 
    (i, iterator) => 
      if (i == 0 && iterator.hasNext) { 
         iterator.next 
         iterator 
        } else iterator)


  val rowRDD = noHeader.map(_.split(",")).map(p => Row(p(0), p(1), p(2), p(3), p(4),p(5)))

  val dataFrame = sqlContext.createDataFrame(rowRDD, schema);
  return dataFrame;

}

共有1个答案

何勇
2023-03-14

亲爱的哈米德,你可以试试下面的代码

val sc = new SparkContext(new SparkConf().setMaster("local").setAppName("test"))
val sqlcon = new SQLContext(sc)
//comma separated list of columnName:type

def main(args:Array[String]){
var schemaString ="Id:int,FirstName:text,LastName:text,Email:string,Country:text"
val schema =
      StructType(
        schemaString.split(",").map(fieldName => StructField(fieldName.split(":")(0),
          getFieldTypeInSchema(fieldName.split(":")(1)), true)))
val rdd=sc.textFile("/users.csv")
val noHeader = rdd.mapPartitionsWithIndex( 
(i, iterator) => 
  if (i == 0 && iterator.hasNext) { 
     iterator.next 
     iterator 
    } else iterator)
 val rowRDDx =noHeader.map(p => {
      var list: collection.mutable.Seq[Any] = collection.mutable.Seq.empty[Any]
      var index = 0
      var tokens = p.split(",")
      tokens.foreach(value => {
        var valType = schema.fields(index).dataType
        var returnVal: Any = null
        valType match {
          case IntegerType => returnVal = value.toString.toInt
          case DoubleType => returnVal = value.toString.toDouble
          case LongType => returnVal = value.toString.toLong
          case FloatType => returnVal = value.toString.toFloat
          case ByteType => returnVal = value.toString.toByte
          case StringType => returnVal = value.toString
          case TimestampType => returnVal = value.toString
        }
        list = list :+ returnVal
        index += 1
      })
      Row.fromSeq(list)
    })
val df = sqlcon.applySchema(rowRDDx, schema)
}
def getFieldTypeInSchema(ftype: String): DataType = {

    ftype match {
      case "int" => return IntegerType
      case "double" => return DoubleType
      case "long" => return LongType
      case "float" => return FloatType
      case "byte" => return ByteType
      case "string" => return StringType
      case "date" => return TimestampType
      case "timestamp" => return StringType
      case "uuid" => return StringType
      case "decimal" => return DoubleType
      case "boolean" => BooleanType
      case "counter" => IntegerType
      case "bigint" => IntegerType
      case "text" => return StringType
      case "ascii" => return StringType
      case "varchar" => return StringType
      case "varint" => return IntegerType
      case default => return StringType
    }
  }

希望它能帮助你:)

 类似资料:
  • 我有一个小数据集,它将是Spark工作的结果。为了方便起见,我正在考虑在作业结束时将此数据集转换为数据帧,但很难正确定义模式。问题是下面的最后一个字段(

  • 我的下一个问题并不新鲜,但我想了解如何一步一步地解决它。 在Spark应用程序中,我创建了数据帧。我们把它叫做df。Spark版本:2.4.0 如何从这个DataFrame创建文件并将csv文件放入服务器中的特定文件夹? 例如,这段代码正确吗?我注意到有些人使用或来完成这项任务。但我不明白在我的情况下哪一个会更好。 当我尝试使用下一个代码时,它会引发错误: 我以root用户身份运行Spark应用程

  • Scala\u Spark\u DataFrameReader\u csv的文档表明,Spark可以记录在读取时检测到的格式错误的行。csv文件 -如何记录格式错误的行 -是否可以获取包含格式错误行的val或var? 链接文档中的选项是:maxMalformedLogPerPartition(默认值10):设置Spark将为每个分区记录的最大错误行数。超过此数字的格式错误记录将被忽略

  • 我想使用Spark和Scala强制转换dataframe的模式以更改某些列的类型。 具体地说,我正在尝试使用AS[U]函数,其描述为:“返回一个新的数据集,其中每个记录都映射到指定的类型。用于映射列的方法取决于U的类型。” 原则上,这正是我想要的,但我不能使它起作用。 下面是一个取自https://github.com/apache/spark/blob/master/sql/core/src/t

  • 我有一些像这样的JSON数据: 我可以在以下位置阅读: 我可以打印模式与 我可以显示数据。显示(10,false) 我可以打印/读取模式。模式为: 我可以打印出来更好: 现在,如果我在同一个文件中阅读,没有注释和回复行,val df2=sqlContext。阅读json(“/data/partialRevOnly.json”)只要删除这些行,我就可以用打印模式得到这样的结果: 我不喜欢这样,所以我