当前位置: 首页 > 知识库问答 >
问题:

Spark应用程序如何从DataFrame(Scala)创建CSV文件?

江奕
2023-03-14

我的下一个问题并不新鲜,但我想了解如何一步一步地解决它。

在Spark应用程序中,我创建了数据帧。我们把它叫做df。Spark版本:2.4.0

val df: DataFrame  = Seq(
    ("Alex", "2018-01-01 00:00:00", "2018-02-01 00:00:00", "OUT"),
    ("Bob", "2018-02-01 00:00:00", "2018-02-05 00:00:00", "IN"),
    ("Mark", "2018-02-01 00:00:00", "2018-03-01 00:00:00", "IN"),
    ("Mark", "2018-05-01 00:00:00", "2018-08-01 00:00:00", "OUT"),
    ("Meggy", "2018-02-01 00:00:00", "2018-02-01 00:00:00", "OUT")
).toDF("NAME", "START_DATE", "END_DATE", "STATUS")

如何从这个DataFrame创建. csv文件并将csv文件放入服务器中的特定文件夹?

例如,这段代码正确吗?我注意到有些人使用coalesce重新分区来完成这项任务。但我不明白在我的情况下哪一个会更好。

union.write
  .format("com.databricks.spark.csv")
  .option("header", "true")
  .save("/home/reports/")

当我尝试使用下一个代码时,它会引发错误:

org.apache.hadoop.security.AccessControlException: Permission denied: user=root, access=WRITE, inode="/home/reports/_temporary/0":hdfs:hdfs:drwxr-xr-x 

我以root用户身份运行Spark应用程序<代码>报告由root用户使用下一个命令创建的文件夹:

mkdir -m 777 reports

似乎只有hdfs用户才能写入文件。

共有1个答案

蓬弘
2023-03-14

我相信你对Spark的行为方式感到困惑,我建议你先阅读官方留档和/或一些教程
尽管如此,我希望这能回答你的问题。

此代码将数据帧保存为本地文件系统上的单个CSV文件
它在Ubuntu笔记本电脑上用Spark 2.4.0和Scala 2.12.8测试。

import org.apache.spark.sql.SparkSession

val spark =
  SparkSession
    .builder
    .master("local[*]")
    .appName("CSV Writter Test")
    .getOrCreate()
import spark.implicits._

val df =
  Seq(
    ("Alex", "2018-01-01 00:00:00", "2018-02-01 00:00:00", "OUT"),
    ("Bob", "2018-02-01 00:00:00", "2018-02-05 00:00:00", "IN"),
    ("Mark", "2018-02-01 00:00:00", "2018-03-01 00:00:00", "IN"),
    ("Mark", "2018-05-01 00:00:00", "2018-08-01 00:00:00", "OUT"),
    ("Meggy", "2018-02-01 00:00:00", "2018-02-01 00:00:00", "OUT")
  ).toDF("NAME", "START_DATE", "END_DATE", "STATUS")

df.printSchema
// root
//  |-- NAME: string (nullable = true)
//  |-- START_DATE: string (nullable = true)
//  |-- END_DATE: string (nullable = true)
//  |-- STATUS: string (nullable = true)

df.coalesce(numPartitions = 1)
  .write
  .option(key = "header", value = "true")
  .option(key = "sep", value = ",")
  .option(key = "encoding", value = "UTF-8")
  .option(key = "compresion", value = "none")
  .mode(saveMode = "OVERWRITE")
  .csv(path = "file:///home/balmungsan/dailyReport/") // Change the path. Note there are 3 /, the first two are for the file protocol, the third one is for the root folder.

spark.stop()

现在,让我们检查保存的文件。

balmungsan@BalmungSan:dailyReport $ pwd
/home/balmungsan/dailyReport

balmungsan@BalmungSan:dailyReport $ ls
part-00000-53a11fca-7112-497c-bee4-984d4ea8bbdd-c000.csv  _SUCCESS

balmungsan@BalmungSan:dailyReport $ cat part-00000-53a11fca-7112-497c-bee4-984d4ea8bbdd-c000.csv 
NAME,START_DATE,END_DATE,STATUS
Alex,2018-01-01 00:00:00,2018-02-01 00:00:00,OUT
Bob,2018-02-01 00:00:00,2018-02-05 00:00:00,IN
Mark,2018-02-01 00:00:00,2018-03-01 00:00:00,IN
Mark,2018-05-01 00:00:00,2018-08-01 00:00:00,OUT
Meggy,2018-02-01 00:00:00,2018-02-01 00:00:00,OUT

存在成功文件,表示写入成功。

  • 您需要指定要保存到本地文件系统而不是HDFS中的//协议
 类似资料:
  • 我试图从一个简单的熊猫数据流创建一个火花数据流。这是我遵循的步骤。 在这之前一切都好。输出为: Spark UI 版本:V2.4.0 主人:本地[*]

  • 我有一个数据框架,数据如下。 我想将上面的数据帧写入csv文件,其中将使用当前时间戳创建文件名。 但此代码工作不正常。给出以下错误 有没有更好的方法来实现这一点,使用scala和火花?此外,即使我试图创建文件与时间戳代码是创建一个目录与时间戳和在该目录内的csv与数据创建一个随机名称.我怎么能有时间戳文件名到这些csv文件,而不是创建一个目录?

  • 问题内容: 我如何使用python中的哪个函数从spark数据帧创建一组?如果标签/结果不是第一列,但我可以引用其列名“状态”,该怎么办? 我使用此.map()函数创建Python数据框: 在reduce函数重新组合了所有Pandas数据帧之后,我将其转换为Spark数据帧。 但是现在如何在Python中从中创建呢?我认为这可能是另一个功能? 问题答案: 如果您已经具有数字功能并且不需要其他转换,

  • 我有一个简单的代码,它将一个pandas数据文件保存到一个csv文件中。到目前为止,它是通过覆盖文件名来工作的,所以每次我运行它时,它只是用一个同名的新文件替换旧文件。是否可以保存此数据文件,但让它按顺序创建新文件,即如果目录中已经有一个名为“filename1”的文件,则创建一个名为“filename2”的新文件,这样原始文件中的数据就不会丢失了?

  • 问题内容: 我刚刚继承了一个Java应用程序,该应用程序需要作为服务安装在XP和Vista上。自从我以任何形式使用Windows至今已有8年了,而且我从未创建过一项服务,更不用说从Java应用程序之类的东西了了(我为该应用程序提供了一个jar,并且有一个依赖项jar-log4j )。使它作为服务运行所需的魔术是什么?我已经掌握了源代码,因此尽管可以避免进行代码修改,但还是可以的。 问题答案: 我对