当前位置: 首页 > 知识库问答 >
问题:

如何优化Spark作业将S3文件处理到Hive Parquet表中

公良信然
2023-03-14
    null
private def processLines(lines: RDD[String]): DataFrame = {
    val updatedLines = lines.mapPartitions(row => ...)
    spark.createDataFrame(updatedLines, schema)
}

// Read S3 files and repartition() and cache()
val lines: RDD[String] = spark.sparkContext
    .textFile(pathToFiles, numFiles) 
    .repartition(2 * numFiles) // double the parallelism
    .cache()

val numRawLines = lines.count()

// Custom process each line and cache table
val convertedLines: DataFrame = processLines(lines)
convertedRows.createOrReplaceTempView("temp_tbl")
spark.sqlContext.cacheTable("temp_tbl")
val numRows = spark.sql("select count(*) from temp_tbl").collect().head().getLong(0)

// Select a subset of the data
val myDataFrame = spark.sql("select a, b, c from temp_tbl where field = 'xxx' ")

// Define # of parquet files to write using coalesce
val numParquetFiles = numRows / 1000000
var lessParts = myDataFrame.rdd.coalesce(numParquetFiles)
var lessPartsDataFrame = spark.sqlContext.createDataFrame(lessParts, myDataFrame.schema)
lessPartsDataFrame.createOrReplaceTempView('my_view')

// Insert data from view into Hive parquet table
spark.sql("insert overwrite destination_tbl 
           select * from my_view")    
lines.unpersist()

应用程序读取所有S3文件=>重新分区到文件数量的两倍=>缓存RDD=>自定义处理每行=>创建临时视图/缓存表=>计数num行=>选择数据子集=>减少分区数量=>创建数据子集的视图=>使用视图插入配置单元目标表=>取消RDD持久化。

我不确定为什么执行要花很长时间。是spark执行参数设置不正确,还是这里调用了一些错误的东西?

共有1个答案

姜德容
2023-03-14

在查看度量标准之前,我将尝试对代码进行以下更改。

private def processLines(lines: DataFrame): DataFrame = {
  lines.mapPartitions(row => ...)
}

val convertedLinesDf = spark.read.text(pathToFiles)
    .filter("field = 'xxx'")
    .cache()

val numLines = convertedLinesDf.count() //dataset get in memory here, it takes time        
// Select a subset of the data, but it will be fast if you have enough memory
// Just use Dataframe API
val myDataFrame = convertedLinesDf.transform(processLines).select("a","b","c")

//coalesce here without converting to RDD, experiment what best
myDataFrame.coalesce(<desired_output_files_number>)
  .write.option(SaveMode.Overwrite)
  .saveAsTable("destination_tbl")
  • 如果不计算行数,缓存就毫无用处。它将占用一些内存并增加一些GC压力
  • 缓存表可能会消耗更多内存并增加更多GC压力
  • 将Dataframe转换为RDD代价很高,因为它意味着ser/deser操作
  • 不确定要做什么:val numParquetFiles=numRows/1000000和重新分区(2*numfiles)。在您的设置中,1000个每个30MB的文件将为您提供1000个分区。像这样可以很好。调用重新分区和合并可能会触发代价高昂的洗牌操作。(合并可能不会触发洗牌)

如果你有什么改进就告诉我!

 类似资料:
  • 真的...已经讨论了很多。 然而,有很多模棱两可之处,提供的一些答案。。。包括在JAR/执行器/驱动程序配置或选项中复制JAR引用。 应为每个选项澄清以下歧义、不清楚和/或省略的细节: 类路径如何受到影响 驾驶员 执行器(用于正在运行的任务) 两者都有 一点也不 对于任务(给每个执行者) 方法 方法 或 ,或者 别忘了,spack-提交的最后一个参数也是一个. jar文件。 我知道在哪里可以找到主

  • null 问题1:Spark如何并行处理? 我想大部分的执行时间(99%?)上面的解决方案是从USB驱动器中读取1TB文件到Spark集群中。从USB驱动器读取文件是不可并行的。但是在读取整个文件之后,Spark在底层做了什么来并行处理呢? > 有多少节点用于创建DataFrame?(也许只有一个?) 假设Snappy压缩的Parquet文件小10倍,大小=100GB,HDFS块大小=128 MB

  • 注意:计数是对处理文件需要多长时间的更多调试。这项工作几乎花了一整天的时间,超过10个实例,但仍然失败,错误发布在列表的底部。然后我找到了这个链接,它基本上说这不是最佳的:https://forums.databricks.com/questions/480/how-do-i-ingest-a-large-number-of-files-from-s3-my.html 然后,我决定尝试另一个我目前

  • 我正在处理IDE中制作一个Java处理项目,并希望将其传播到多个PDE(处理源代码)文件中。 我无法使用导入将Java处理源代码文件导入为Java类文件。 <代码>配置。pde <代码>项目。pde 返回导入项目/配置无法解析 分别为。 我必须先编译PDE文件吗?是否可以将处理IDE设置为每次运行都自动执行? 太长,读不下去了 拥有此项目文件夹: 如何使用配置中的函数和变量。项目中的pde。pde

  • 是的...已经讨论了很多了。 但是,有很多不明确的地方,提供了一些答案...包括在jars/executor/driver配置或选项中重复jar引用。 类路径的影响 驱动程序 执行程序(用于正在运行的任务) 两者 一点也不 对于任务(对每个执行者) 用于远程驱动程序(如果在群集模式下运行) 方法 方法 或 或 不要忘记,spark-submit的最后一个参数也是一个.jar文件。 如果我从文档中猜