当前位置: 首页 > 知识库问答 >
问题:

使用时间路径的火花写入操作HDFS

栾和玉
2023-03-14

我正在尝试从这个Scala代码写入csv文件。我使用HDFS作为临时目录,然后writer.write在现有子文件夹中创建一个新文件。我收到以下错误消息:

val inputFile = "s3a:/tfsdl-ghd-wb/raidnd/rawdata.csv" //  INPUT path 
val outputFile = "s3a:/tfsdl-ghd-wb/raidnd/Incte_19&20.csv" //  OUTPUT path 
val dateFormat = new SimpleDateFormat("yyyyMMdd")
val fileSystem = getFileSystem(inputFile)
val inputData = readCSVFile(fileSystem, inputFile, skipHeader = true).toSeq

val writer = new PrintWriter(new File(outputFile))
writer.write("Sales,cust,Number,Date,Credit,SKU\n")
filtinp.foreach(x => {
  val (com1, avg1) = com1Average(filtermp, x)
  val (com2, avg2) = com2Average(filtermp, x)
  writer.write(s"${x.Date},${x.cust},${x.Number},${x.Credit}\n")
})
writer.close()

def getFileSystem(path: String): FileSystem = {
val hconf = new Configuration() // initialize new hadoop configuration
new Path(path).getFileSystem(hconf) // get new filesystem to handle data

java.io./tfsdl-ghd-wb/raidnd/Incte_19

如果我选择新建文件或退出文件,也会发生同样的情况,我已经检查了路径是否正确,只想在其中创建一个新文件。

问题是,为了使用基于文件系统的源写入数据,您需要一个临时目录,这是Spark使用的提交机制的一部分,即数据首先写入临时目录,一旦任务完成,就会自动将处理过的文件移动到最终路径。

我是否应该将每个Spark应用程序的临时文件夹路径更改为S3?我认为最好在本地处理(本地文件HDFS),然后将处理后的输出文件上载到S3

此外,我刚刚看到我使用的databricks集群中没有“无火花配置集”,这会干扰问题吗?

共有1个答案

段干河
2023-03-14

如果您能够以DataFrame的形式使用火花/scala读取原始数据,那么您可以对数据帧执行转换以构建最终的数据帧。一旦您有了最终的数据帧,那么需要将其写入csv文件,您可以使用下面一行代码将csv文件保存到s3桶路径或hdfs路径。

df.write.format('csv').option('header','true').mode('overwrite').option('sep',',').save('s3a:/tfsdl-ghd-wb/raidnd/Incte_19&20.csv')
 类似资料:
  • 我创建并持久化一个df1,然后在其上执行以下操作: 我有一个有16个节点的集群(每个节点有1个worker和1个executor,4个内核和24GB Ram)和一个master(有15GB Ram)。Spark.shuffle.Partitions也是192个。它挂了2个小时,什么也没发生。Spark UI中没有任何活动。为什么挂这么久?是dagscheduler吗?我怎么查?如果你需要更多的信息

  • 我试图运行火花程序,在纱线客户端模式下使用火花提交,并获得类NotFindException。所以我的问题是我应该在哪个参数中传递我的jar(--jars或--drier-class-path)。 Spark=2.0.0 HDP 2.5 Hadoop=2.7.3

  • 我正在尝试使用Databricks的spark-csv2.10依赖关系将一个数据帧写入到HDFS的*.csv文件。依赖关系似乎可以正常工作,因为我可以将.csv文件读入数据帧。但是当我执行写操作时,我会得到以下错误。将头写入文件后会出现异常。 当我将查询更改为时,write工作很好。 有谁能帮我一下吗? 编辑:根据Chandan的请求,这里是的结果

  • 我想知道如何使用JAVA从SparkSQL中的领域特定语言(DSL)函数调用UDF函数。 我有UDF函数(仅举例): 我已经注册到sqlContext了 当我运行下面的查询时,我的UDF被调用,我得到一个结果。 我将使用Spark SQL中特定于域的语言的函数转换此查询,但我不确定如何进行转换。 我发现存在调用 UDF() 函数,其中其参数之一是函数 fnctn 而不是 UDF2。如何使用 UDF

  • 当我尝试将数据帧写入Hive Parket分区表时 它将在HDFS中创建大量块,每个块只有少量数据。 我了解它是如何进行的,因为每个 spark 子任务将创建一个块,然后将数据写入其中。 我也理解,块数会提高Hadoop的性能,但达到阈值后也会降低性能。 如果我想自动设置数字分区,有人有一个好主意吗?

  • 前面小节介绍了 Linux 目录相关的知识点,相信读者已经对 Linux 系统有了一定的认识和理解,本小节围绕路径相关的操作来介绍如何使用 cd 切换到不同的路径,另外还介绍如何使用 Tab 键自动补全文件或目录名。 1. pwd 查看当前所在目录的绝对路径 在对 Linux 目录操作过程中,可以使用 pwd 命令查看当前所处目录的 绝对路径: pwd 执行结果如下图: Tips:如图所示显示的