当前位置: 首页 > 知识库问答 >
问题:

将Spark输出CSV文件与单个标头合并

柯昆
2023-03-14

我想在AWS中创建一个数据处理管道,以最终将处理后的数据用于机器学习

我有一个Scala脚本,它从S3获取原始数据,对其进行处理并将其写入HDFS,甚至使用Spark CSV将其写入S3。如果我想使用AWS机器学习工具来训练预测模型,我想我可以使用多个文件作为输入。但如果我想使用其他东西,我认为最好是接收单个CSV输出文件。

目前,由于我不想为了性能目的而使用重新分区(1)或合并(1),我已经使用hadoop fs-getmerge进行手动测试,但由于它只是合并作业输出文件的内容,我遇到了一个小问题。我需要数据文件中的一行标题来训练预测模型。

如果我使用<代码>。选项(“header”,“true”)用于spark csv,然后它将头写入每个输出文件,合并后,数据中的头行数与输出文件中的头行数相同。但如果header选项为false,则不会添加任何头。

现在,我找到了一个选项,可以将Scala脚本中的文件与Hadoop API的FileUtil合并。复制合并。我用下面的代码在spark shell中进行了尝试。

import org.apache.hadoop.fs.FileUtil
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
val configuration = new Configuration();
val fs = FileSystem.get(configuration);
FileUtil.copyMerge(fs, new Path("smallheaders"), fs, new Path("/home/hadoop/smallheaders2"), false, configuration, "")

但是这个解决方案仍然只是将文件连接在一起,而不处理标题。如何获取只有一行标题的输出文件?

我甚至尝试添加df.columns.mkString(",")作为CopyMerge的最后一个参数,但这仍然多次添加标头,而不是一次。

共有3个答案

芮化
2023-03-14

我们有一个类似的问题,遵循以下方法来获取单个输出文件-

  1. 将数据帧写入带有标题的hdfs,而不使用合并或重新分区(转换后)
dataframe.write.format("csv").option("header", "true").save(hdfs_path_for_multiple_files)
dataframe = spark.read.option('header', 'true').csv(hdfs_path_for_multiple_files)

dataframe.coalesce(1).write.format('csv').option('header', 'true').save(hdfs_path_for_single_file)

这样,在执行转换时,您将避免与合并或重新分区相关的性能问题(步骤1)。第二步提供具有一个标题行的单个输出文件。

轩辕晔
2023-03-14
  1. 使用dataframe输出标头。架构(val header=dataDF.schema.fieldNames.reduce(\u“,”\ u))
子车征
2023-03-14

你可以这样到处走。

  • 1.创建一个包含标头名称的新DataFrame(HeaderDF)。
  • 2.将其与包含数据的DataFrame(dataDF)联合。
  • 3.使用选项(“标头”、“false”)将联合编辑的DataFrame输出到磁盘。
  • 使用hadoop FileUtil4.merge分区文件(part-0000**0.csv)

通过这种方式,除了单个分区的内容具有来自headerDF的一行头名称外,所有分区都没有头。当所有分区合并在一起时,文件顶部只有一个标头。示例代码如下

  //dataFrame is the data to save on disk
  //cast types of all columns to String
  val dataDF = dataFrame.select(dataFrame.columns.map(c => dataFrame.col(c).cast("string")): _*)

  //create a new data frame containing only header names
  import scala.collection.JavaConverters._
  val headerDF = sparkSession.createDataFrame(List(Row.fromSeq(dataDF.columns.toSeq)).asJava, dataDF.schema)

  //merge header names with data
  headerDF.union(dataDF).write.mode(SaveMode.Overwrite).option("header", "false").csv(outputFolder)

  //use hadoop FileUtil to merge all partition csv files into a single file
  val fs = FileSystem.get(sparkSession.sparkContext.hadoopConfiguration)
  FileUtil.copyMerge(fs, new Path(outputFolder), fs, new Path("/folder/target.csv"), true, spark.sparkContext.hadoopConfiguration, null)
 类似资料:
  • 我正试图将数据从spark dataframe导出到。csv文件: 它正在创建一个文件名为“Part-R-00001-512872F2-9B51-46C5-B0EE-31D626063571.csv” 我希望文件名为“part-r-00000.csv”或“part-00000.csv”

  • 问题内容: 运行后,结果类似于: 第一个元素是我已提取到名为的文件的内联javascript 这可行,但我想将 所有3个文件合并为一个文件 我尝试使用filesmerge.com合并JS文件,但这在引用单个文件时导致错误: 然后,我尝试使用jscompress.com进行合并,尽管这不会产生任何错误,但未呈现react root元素 我也尝试过在create-react-app repo上建议的此

  • 问题内容: 我编写了一个Python脚本,用于合并两个csv文件,现在我想在最终的csv中添加标头。我尝试按照此处报告的建议进行操作,但出现以下错误:。解决此问题的最有效方法是什么? 这是我正在使用的代码: 问题答案: 该班预计 字典 的每一行。如果您只想编写一个初始标头,请使用常规并在标头的简单行中传递: 另一种方法是在跨数据复制时生成字典:

  • 我有一个商业案例,使用Spring batch将多个csv文件(每个文件大约1000个,包含1000条记录)合并成单个csv。 请帮助我提供方法和性能方面的指导和解决方案。 到目前为止,我已经尝试了两种方法, 方法1。 Tasklet chunk与multiResourceItemReader一起从目录中读取文件,FlatFileItemWriter作为项目编写器。 这里的问题是,它的处理速度非常

  • 如果我有一些文件,每个文件都有一个页面,我想使用 我在这里读到了几十个关于相同的问题,但大多数问题都已经过时或不再适用于