当前位置: 首页 > 知识库问答 >
问题:

火花加载CSV文件作为数据帧?

娄鹤轩
2023-03-14

我想在spark中读取一个CSV,将其转换为DataFrame,并使用df.registertemptable(“table_name”)将其存储在HDFS

scala> val df = sqlContext.load("hdfs:///csv/file/dir/file.csv")
java.lang.RuntimeException: hdfs:///csv/file/dir/file.csv is not a Parquet file. expected magic number at tail [80, 65, 82, 49] but found [49, 59, 54, 10]
    at parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:418)
    at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:277)
    at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:276)
    at scala.collection.parallel.mutable.ParArray$Map.leaf(ParArray.scala:658)
    at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:54)
    at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
    at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
    at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:56)
    at scala.collection.parallel.mutable.ParArray$Map.tryLeaf(ParArray.scala:650)
    at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:165)
    at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:514)
    at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

在Apache Spark中将CSV文件加载为DataFrame的正确命令是什么?

共有1个答案

赵经国
2023-03-14

spark-csv是核心Spark功能的一部分,不需要单独的库。所以你可以比方说

df = spark.read.format("csv").option("header", "true").load("csvfile.csv")

在scala中(这适用于任何格式--在分隔符中提到“,”用于csv,“\t”用于tsv等)

val df=sqlcontext.read.format(“com.databricks.spark.csv”).option(“delimiter”,“,”).load(“csvfile.csv”)

 类似资料:
  • 然后我跑: 然后我得到: IllegalArgumentException:需求失败:列数不匹配。旧列名(1):值新列名(5):startIP,endIP,City,Longitude,Latitude at scala.predef$.require(predef.scala:224)at org.apache.spark.sql.dataset.todf(dataset.scala:376)a

  • 我使用spark-core 2.0.1版和Scala2.11。我有一个简单的代码来读取一个包含\escapes的csv文件。 null 有人面临同样的问题吗?我是不是漏掉了什么? 谢谢

  • 我有5个表存储为CSV文件(A.CSV、B.CSV、C.CSV、D.CSV、E.CSV)。每个文件按日期分区。如果文件夹结构如下:

  • 我正在尝试使用Databricks的spark-csv2.10依赖关系将一个数据帧写入到HDFS的*.csv文件。依赖关系似乎可以正常工作,因为我可以将.csv文件读入数据帧。但是当我执行写操作时,我会得到以下错误。将头写入文件后会出现异常。 当我将查询更改为时,write工作很好。 有谁能帮我一下吗? 编辑:根据Chandan的请求,这里是的结果

  • 我是Spark的新手。我尝试在本地模式(windows)下使用spark java将csv文件保存为parquet。我得到了这个错误。 原因:org.apache.spark.Spark异常:写入行时任务失败 我引用了其他线程并禁用了spark推测 set("spark.speculation "," false ") 我还是会出错。我在csv中只使用了两个专栏进行测试。 输入: 我的代码: 请帮

  • 我有麻烦重命名基于csv的数据帧的标头。 我得到了以下数据帧:df1: 现在我想根据csv文件更改列名(第一行),如下所示: 因此,我期望数据帧如下所示: 有什么想法吗?感谢您的帮助:)