当前位置: 首页 > 知识库问答 >
问题:

使用Scala和SparkSql并导入带有头[重复]的CSV文件

秦宜修
2023-03-14

我对Spark和Scala非常陌生(比如两个小时的新体验),我正在尝试玩CSV数据文件,但我无法做到,因为我不确定如何处理“标题行”,我在互联网上搜索了加载或跳过它的方法,但我真的不知道怎么做。我正在粘贴我正在使用的代码,请帮助我。

object TaxiCaseOne{

case class NycTaxiData(Vendor_Id:String, PickUpdate:String, Droptime:String, PassengerCount:Int, Distance:Double, PickupLong:String, PickupLat:String, RateCode:Int, Flag:String, DropLong:String, DropLat:String, PaymentMode:String, Fare:Double, SurCharge:Double, Tax:Double, TripAmount:Double, Tolls:Double, TotalAmount:Double)

def mapper(line:String): NycTaxiData = {
val fields = line.split(',')  

val data:NycTaxiData = NycTaxiData(fields(0), fields(1), fields(2), fields(3).toInt, fields(4).toDouble, fields(5), fields(6), fields(7).toInt, fields(8), fields(9),fields(10),fields(11),fields(12).toDouble,fields(13).toDouble,fields(14).toDouble,fields(15).toDouble,fields(16).toDouble,fields(17).toDouble)
return data
}def main(args: Array[String]) {

// Set the log level to only print errors
Logger.getLogger("org").setLevel(Level.ERROR)
 // Use new SparkSession interface in Spark 2.0
val spark = SparkSession
  .builder
  .appName("SparkSQL")
  .master("local[*]")
  .config("spark.sql.warehouse.dir", "file:///C:/temp") // Necessary to work around a Windows bug in Spark 2.0.0; omit if you're not on Windows.
  .getOrCreate()
val lines = spark.sparkContext.textFile("../nyc.csv")

val data = lines.map(mapper)

// Infer the schema, and register the DataSet as a table.
import spark.implicits._
val schemaData = data.toDS

schemaData.printSchema()

schemaData.createOrReplaceTempView("data")

// SQL can be run over DataFrames that have been registered as a table
val vendor = spark.sql("SELECT * FROM data WHERE Vendor_Id == 'CMT'")

val results = teenagers.collect()

results.foreach(println)

spark.stop()
  }
}

共有1个答案

邓俊英
2023-03-14

如果你有一个CSV文件,你应该使用spark CSV来读取CSV文件,而不是使用textFile

val spark = SparkSession.builder().appName("test val spark = SparkSession
  .builder
  .appName("SparkSQL")
  .master("local[*]")
  .config("spark.sql.warehouse.dir", "file:///C:/temp") // Necessary to work around a Windows bug in Spark 2.0.0; omit if you're not on Windows.
  .getOrCreate()

val df = spark.read
        .format("csv")
        .option("header", "true") //This identifies first line as header
        .csv("../nyc.csv")

您需要一个spark core和spark sql依赖项来处理这个问题

希望这有帮助!

 类似资料:
  • 本文向大家介绍mysql 导出CSV文件 并带表头的方法,包括了mysql 导出CSV文件 并带表头的方法的使用技巧和注意事项,需要的朋友参考一下 参考官方文档 http://dev.mysql.com/doc/refman/5.7/en/select-into.html 实例如下: 先查看一下结果 导出CSV文件 以上这篇mysql 导出CSV文件 并带表头的方法就是小编分享给大家的全部内容了,

  • 我正试图将数据从spark dataframe导出到。csv文件: 它正在创建一个文件名为“Part-R-00001-512872F2-9B51-46C5-B0EE-31D626063571.csv” 我希望文件名为“part-r-00000.csv”或“part-00000.csv”

  • 2)如果数据集不为空,则打印csv文件中的数据的标题 需要第一点的解决方案 第二点是使用此代码

  • 当我导入包含一些国家/地区的CSV文件时,我遇到了一些字符的问题。它没有很好的编码然后我得到了?标记,而不是CSV文件中写入的字符。这里有一些国家让我面临这个问题:奥兰群岛、圣巴特勒米、科特迪瓦、库拉索岛。 下面是导入csv文件的代码: 首先我使用了FileReader,所有这些国家都出现了问题,然后我改为InputStreamReader并添加了这个UTF-8字符集,问题几乎解决了。当我使用字符

  • 我正在使用http://csvjdbc.sourceforge.net/doc.html要将磁盘上的CSV文件(例如“myDir”中的“myFile”)视为SQL DB,我可以使用SQL语法进行查询: 这工作正常,但是当CSV文件没有头时,我遇到了麻烦。在那种情况下,第一数据线被认为是报头并且因此不像其他数据线那样被读取。 有没有办法告诉查询不要寻找标头,而将第一行视为数据输入?

  • 当您尝试使用emptyDF创建csv文件时,Spark会创建一个没有头的空文件,即使header选项是true(header=true) 是否可以为EMPTYDF创建带有头的csv文件?