当前位置: 首页 > 知识库问答 >
问题:

如何在spark SCALA中重命名AWS中的spark数据帧输出文件

何兴安
2023-03-14

我将spark数据帧输出保存为带有分区的scala中的csv文件。我在齐柏林飞艇上就是这样做的。

val sqlContext = new org.apache.spark.sql.SQLContext(sc)

    import sqlContext.implicits._
    import org.apache.spark.{ SparkConf, SparkContext }
    import java.sql.{Date, Timestamp}
    import org.apache.spark.sql.Row
    import org.apache.spark.sql.types._
    import org.apache.spark.sql.functions.udf

import org.apache.spark.sql.functions.input_file_name
import org.apache.spark.sql.functions.regexp_extract

val get_cus_val = spark.udf.register("get_cus_val", (filePath: String) => filePath.split("\\.")(3))

val rdd = sc.textFile("s3://trfsmallfffile/FinancialLineItem/MAIN")
val header = rdd.filter(_.contains("LineItem.organizationId")).map(line => line.split("\\|\\^\\|")).first()
val schema = StructType(header.map(cols => StructField(cols.replace(".", "_"), StringType)).toSeq)
val data = sqlContext.createDataFrame(rdd.filter(!_.contains("LineItem.organizationId")).map(line => Row.fromSeq(line.split("\\|\\^\\|").toSeq)), schema)

val schemaHeader = StructType(header.map(cols => StructField(cols.replace(".", "."), StringType)).toSeq)
val dataHeader = sqlContext.createDataFrame(rdd.filter(!_.contains("LineItem.organizationId")).map(line => Row.fromSeq(line.split("\\|\\^\\|").toSeq)), schemaHeader)

val df1resultFinal=data.withColumn("DataPartition", get_cus_val(input_file_name))
val rdd1 = sc.textFile("s3://trfsmallfffile/FinancialLineItem/INCR")
val header1 = rdd1.filter(_.contains("LineItem.organizationId")).map(line => line.split("\\|\\^\\|")).first()
val schema1 = StructType(header1.map(cols => StructField(cols.replace(".", "_"), StringType)).toSeq)
val data1 = sqlContext.createDataFrame(rdd1.filter(!_.contains("LineItem.organizationId")).map(line => Row.fromSeq(line.split("\\|\\^\\|").toSeq)), schema1)


import org.apache.spark.sql.expressions._
val windowSpec = Window.partitionBy("LineItem_organizationId", "LineItem_lineItemId").orderBy($"TimeStamp".cast(LongType).desc) 
val latestForEachKey = data1.withColumn("rank", rank().over(windowSpec)).filter($"rank" === 1).drop("rank", "TimeStamp")


val dfMainOutput = df1resultFinal.join(latestForEachKey, Seq("LineItem_organizationId", "LineItem_lineItemId"), "outer")
      .select($"LineItem_organizationId", $"LineItem_lineItemId",
        when($"DataPartition_1".isNotNull, $"DataPartition_1").otherwise($"DataPartition").as("DataPartition"),
        when($"StatementTypeCode_1".isNotNull, $"StatementTypeCode_1").otherwise($"StatementTypeCode").as("StatementTypeCode"),
        when($"FinancialConceptLocalId_1".isNotNull, $"FinancialConceptLocalId_1").otherwise($"FinancialConceptLocalId").as("FinancialConceptLocalId"),
        when($"FinancialConceptGlobalId_1".isNotNull, $"FinancialConceptGlobalId_1").otherwise($"FinancialConceptGlobalId").as("FinancialConceptGlobalId"),
        when($"FinancialConceptCodeGlobalSecondaryId_1".isNotNull, $"FinancialConceptCodeGlobalSecondaryId_1").otherwise($"FinancialConceptCodeGlobalSecondaryId").as("FinancialConceptCodeGlobalSecondaryId"),
        when($"FFAction_1".isNotNull, $"FFAction_1").otherwise($"FFAction|!|").as("FFAction|!|"))
        .filter(!$"FFAction|!|".contains("D|!|"))

val dfMainOutputFinal = dfMainOutput.na.fill("").select($"DataPartition",$"StatementTypeCode",concat_ws("|^|", dfMainOutput.schema.fieldNames.filter(_ != "DataPartition").map(c => col(c)): _*).as("concatenated"))

val headerColumn = dataHeader.columns.toSeq

val header = headerColumn.mkString("", "|^|", "|!|").dropRight(3)

val dfMainOutputFinalWithoutNull = dfMainOutputFinal.withColumn("concatenated", regexp_replace(col("concatenated"), "|^|null", "")).withColumnRenamed("concatenated", header)


dfMainOutputFinalWithoutNull.repartition(1).write.partitionBy("DataPartition","StatementTypeCode")
  .format("csv")
  .option("nullValue", "")
  .option("delimiter", "\t")
  .option("quote", "\u0000")
  .option("header", "true")
  .option("codec", "gzip")
  .save("s3://trfsmallfffile/FinancialLineItem/output")

  val FFRowCount =dfMainOutputFinalWithoutNull.groupBy("DataPartition","StatementTypeCode").count

  FFRowCount.coalesce(1).write.format("com.databricks.spark.xml")
  .option("rootTag", "FFFileType")
  .option("rowTag", "FFPhysicalFile")
  .save("s3://trfsmallfffile/FinancialLineItem/Descr")

现在文件保存在预期的分区文件夹结构中。

现在,我的要求是重命名所有零件文件并将其保存在一个目录中。文件名将作为文件夹结构的名称。

例如,我有一个文件保存在文件夹/数据分区=日本/分区年=1971/part-00001-87a61115-92c9-4926-a803-b46315e55a08.c000.csv.gz

现在我希望我的文件名为

Japan.1971.1.txt.gz
Japan.1971.2.txt.gz

我在我的工作完成后在java map-duce中完成了此操作,然后我正在阅读HDFS文件系统,然后将其移动到不同的位置作为重命名的文件名。

但如何在spark SCALA的AWS S3文件系统中实现这一点。

就我的研究而言,没有直接的方法来重命名spark数据帧输出文件名。

但是有一种实现可以在作业本身中使用saveAsHadoopFile的multipleoutput来完成,但是如何做到呢?。

我正在寻找scala中的一些示例代码

就像完成工作后,我们需要从s3读取文件,对其进行铰孔,然后将其移动到其他位置一样。

共有2个答案

曹智
2023-03-14

好吧,如果您想直接重命名S3 bucket中的文件/对象,这是不可能的。

您可以实现重命名=复制到目标删除源

def prepareNewFilename(oldFilename: String) = {

  val pattern = raw".*/DataPartition=%s/PartitionYear=%s/part-%s.*\.%s"
    .format("([A-Za-z]+)", "([0-9]+)", "([0-9]+)", "([a-z]+)")
    .r

  val pattern(country, year, part, extn) = oldFilename

  "%s.%s.%s.%s.%s".format(country, year, part, "txt", extn)
} 

val oldFilename = "folder/DataPartition=Japan/PartitionYear=1971/part-00001-87a61115-92c9-4926-a803-b46315e55a08.c000.csv.gz"

val newFilename = prepareNewFilename(oldFilename)
//newFilename: String = Japan.1971.00001.txt.gz
import com.amazonaws.AmazonServiceException
import com.amazonaws.services.s3.AmazonS3ClientBuilder

val s3 = AmazonS3ClientBuilder.defaultClient()

try {
  s3.copyObject(sourceBkt, oldFilename, targetBkt, newFilename)
  s3.deleteObject(sourceBkt, oldFilename)
} catch {
  case e: AmazonServiceException =>
    System.err.println(e.getErrorMessage)
    System.exit(1)
}
呼延学
2023-03-14
val tempOutPath = "mediamath.dir"
headerDf.union(outDf)
  .repartition(1)
  .write
  .mode(SaveMode.Overwrite)
  .format("text")
  .option("codec", "gzip")
  .save(tempOutPath)

import org.apache.hadoop.fs._
val sc = spark.sparkContext
val fs = FileSystem.get(sc.hadoopConfiguration)
val file = fs.globStatus(new Path("mediamath.dir/part*.gz"))(0).getPath.getName

fs.rename(new Path("mediamath.dir/" + file), new Path(<aws-s3-path>))

这是我的代码片段,请看看这是否对您有帮助。

 类似资料:
  • 本文向大家介绍如何重命名R语言数据帧中的单列,包括了如何重命名R语言数据帧中的单列的使用技巧和注意事项,需要的朋友参考一下 我们可以通过定义新名称来做到这一点,如下所示: 由于数据框中只有一列,因此使用对象名称就足够了。

  • 有没有比调用多个帧更好的方法来同时为给定的 SparkSQL 添加前缀或重命名所有或多个列? 例如,如果我想检测更改(使用完全外连接)。然后我剩下两个具有相同结构的< code >数据帧。

  • 嗨,我有我的火花数据帧的输出,它创建文件夹结构并创建零件文件。现在我必须合并文件夹内的所有零件文件并将该文件重命名为文件夹路径名。 这就是我做分区的方式 它创建如下文件夹结构 我必须创建这样的最终文件 此处没有零件文件bith 001和002合并为两个一个。 我的数据大小非常大300 GB gzip和35 GB zip,因此变得非常慢。 我在这里看到了一个使用spark CSV编写单个CSV文件的

  • 我正在处理一个包含uni_key和createdDate两列的数据帧。我运行一个SQL查询并将结果保存到中,现在我想将这些结果保存到csv文件中。有什么方法可以做到这一点吗?这是一个代码片段: 此代码当前出现以下错误: AttributeError:“DataFrameWriter”对象没有属性“csv”

  • 我正在尝试转换Spark-Scala中的所有标题/列名。到目前为止,我提出了以下代码,它只替换单个列名。

  • 以下脚本: Floor1正在正确执行,Floor2使用相同的df执行,但使用重命名的列则不正确。我得到一个关键错误: 我知道,有一个类似的问题:重命名列后get keyerror 但我并没有真正得到答案,更重要的是,我没有找到解决办法。