当前位置: 首页 > 知识库问答 >
问题:

如何在EMR上调整火花作业以在S3上快速写入大量数据

冯宪
2023-03-14

我有一个很好的工作,我在两个数据帧之间进行外部连接。第一个数据帧的大小为260 GB,文件格式为文本文件,分为2200个文件,第二个数据帧的大小为2GB。然后,将大约260 GB的数据帧输出写入S3需要很长的时间,因为我在EMR上做了很大的更改,所以我取消了这一操作,之后的2个多小时。

这是我的集群信息。

emr-5.9.0
Master:    m3.2xlarge
Core:      r4.16xlarge   10 machines (each machine has 64 vCore, 488 GiB memory,EBS Storage:100 GiB)

这是我正在设置的群集配置

capacity-scheduler  yarn.scheduler.capacity.resource-calculator :org.apache.hadoop.yarn.util.resource.DominantResourceCalculator
emrfs-site  fs.s3.maxConnections:   200
spark   maximizeResourceAllocation: true
spark-defaults  spark.dynamicAllocation.enabled:    true

我尝试设置内存组件手动也像下面和性能是更好的但同样的事情它是再次采取很长的时间

--num executors 60—conf spark。纱线执行人。memoryOverhead=9216--执行器内存72G--conf spark。纱线驾驶员memoryOverhead=3072--驱动程序内存26G--执行器内核10--驱动程序内核3--conf spark。违约平行度=1200

我没有使用默认分区将数据保存到S3。

添加有关作业和查询计划的所有详细信息,以便易于理解。

真正的原因是分区。这占用了大部分时间。因为我有2K个文件,所以如果我使用像200个这样的重新分区,输出文件将达到10万个,然后在spark中再次加载不是一个好故事。

最后,这里是我的代码,我在这里执行连接,然后保存到S3。。。

import org.apache.spark.sql.expressions._

          val windowSpec = Window.partitionBy("uniqueFundamentalSet", "PeriodId", "SourceId", "StatementTypeCode", "StatementCurrencyId", "FinancialStatementLineItem_lineItemId").orderBy(unix_timestamp($"TimeStamp", "yyyy-MM-dd HH:mm:ss.SSS").cast("timestamp").desc)
          val latestForEachKey = df2resultTimestamp.withColumn("rank", row_number.over(windowSpec)).filter($"rank" === 1).drop("rank", "TimeStamp")

          val columnMap = latestForEachKey.columns.filter(c => c.endsWith("_1") & c != "FFAction|!|_1").map(c => c -> c.dropRight(2)) :+ ("FFAction|!|_1", "FFAction|!|")
          val exprs = columnMap.map(t => coalesce(col(s"${t._1}"), col(s"${t._2}")).as(s"${t._2}"))
          val exprsExtended = Array(col("uniqueFundamentalSet"), col("PeriodId"), col("SourceId"), col("StatementTypeCode"), col("StatementCurrencyId"), col("FinancialStatementLineItem_lineItemId")) ++ exprs

          //Joining both dara frame here
          val dfMainOutput = (dataMain.join(latestForEachKey, Seq("uniqueFundamentalSet", "PeriodId", "SourceId", "StatementTypeCode", "StatementCurrencyId", "FinancialStatementLineItem_lineItemId"), "outer") select (exprsExtended: _*)).filter(!$"FFAction|!|".contains("D|!|"))
          //Joing ends here

          val dfMainOutputFinal = dfMainOutput.na.fill("").select($"DataPartition", $"PartitionYear", $"PartitionStatement", concat_ws("|^|", dfMainOutput.schema.fieldNames.filter(_ != "DataPartition").filter(_ != "PartitionYear").filter(_ != "PartitionStatement").map(c => col(c)): _*).as("concatenated"))

          val headerColumn = dataHeader.columns.toSeq

          val headerFinal = headerColumn.mkString("", "|^|", "|!|").dropRight(3)

          val dfMainOutputFinalWithoutNull = dfMainOutputFinal.withColumn("concatenated", regexp_replace(col("concatenated"), "|^|null", "")).withColumnRenamed("concatenated", headerFinal)

          //  dfMainOutputFinalWithoutNull.repartition($"DataPartition", $"PartitionYear", $"PartitionStatement")
  .write
  .partitionBy("DataPartition", "PartitionYear", "PartitionStatement")
  .format("csv")
  .option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ")
  .option("nullValue", "")
  .option("delimiter", "\t")
  .option("quote", "\u0000")
  .option("header", "true")
  .option("codec", "bzip2")
  .save(outputFileURL)

共有2个答案

索令
2023-03-14

S3是一个对象存储,而不是一个文件系统,因此,最终一致性、非原子重命名操作会产生问题,即每次执行者写入作业结果时,他们中的每个人都会写入主目录(在S3上)之外的临时目录,在主目录中必须写入文件,一旦所有执行者都完成了重命名,以获得原子排他性。在hdfs这样的标准文件系统中,重命名是即时的,但在S3这样的对象存储中,这一切都很好,因为S3上的重命名是以6MB/s的速度完成的。

要克服上述问题,请确保设置以下两个conf参数

1) 火花。hadoop。mapreduce。fileoutputcommitter。算法。版本=2

对于此参数的默认值(即1),commitTask将任务生成的数据从任务临时目录移动到作业临时目录,当所有任务完成时,commitJob将数据从作业临时目录移动到最终目标。因为驱动程序正在执行commitJob的工作,所以对于S3,此操作可能需要很长时间。用户可能经常认为他/她的手机“挂起了”。但是,当mapreduce的值。fileoutputcommitter。算法。版本为2,commitTask将把任务生成的数据直接移动到最终目标,commitJob基本上是不可操作的。

2) 火花。推测=错误

如果此参数设置为true,则如果一个或多个任务在某个阶段中运行缓慢,则将重新启动这些任务。如上所述,通过spark作业在S3上的写入操作非常慢,因此我们可以看到随着输出数据大小的增加,许多任务被重新启动。

这一点以及最终的一致性(当将文件从临时目录移动到主数据目录时)可能会导致FileOutputServer进入死锁,因此作业可能会失败。

或者说

您可以先将输出写入EMR上的本地HDF,然后使用hadoop distcp命令将数据移动到S3。这大大提高了整体输出速度。但是,您需要在EMR节点上有足够的EBS存储,以确保所有输出数据都适合。

此外,您可以以ORC格式写入输出数据,这将大大压缩输出大小。

参考:

https://medium.com/@Subhojit20731/apache-spark-and-amazon-s3-gotchas-and-best-practices-a767242f3d98

墨高杰
2023-03-14

您正在运行五个c3.4大EC2实例,每个实例有30gb的RAM。所以总共只有150GB,比你的小得多

 类似资料:
  • 我在Spark 2.1.0/Cassandra 3.10集群(4台机器*12个内核*256个RAM*2个SSD)上工作,很长一段时间以来,我一直在努力使用Spark Cassandra connector 2.0.1向Cassandra写入特定的大数据帧。 这是我的表的模式 用作主键的散列是256位;列表字段包含多达1MB的某种结构化类型的数据。总共,我需要写几亿行。 目前,我正在使用以下写入方法

  • 首先,我想说的是我看到的解决这个问题的唯一方法是:Spark 1.6.1 SASL。但是,在为spark和yarn认证添加配置时,仍然不起作用。下面是我在Amazon's EMR的一个yarn集群上使用spark-submit对spark的配置: 注意,我用代码将spark.authenticate添加到了sparkContext的hadoop配置中,而不是core-site.xml(我假设我可以

  • 尝试从/向redshift读/写(s3中的数据)。但在访问数据帧时会出现奇怪的错误。我可以看到正在创建数据帧,并且它能够访问数据,因为它输出表的列名

  • 类似的问题,但没有足够的观点来评论。 根据最新的Spark文档,< code>udf有两种不同的用法,一种用于SQL,另一种用于DataFrame。我找到了许多关于如何在sql中使用< code>udf的例子,但是还没有找到任何关于如何在数据帧中直接使用< code>udf的例子。 o.p.针对上述问题提供的解决方案使用,这是,将根据Spark Java API文档在Spark 2.0中删除。在那

  • Vanilla 的 调试 除了查看 nginx 错误日志辅助开发外,为了方便 Vanilla 项目的开发和调试,Vanilla 提供了诸如 print_r 之类的对象输出方法,以及详细友好的页面报错输出,你不需要到服务器日志去查看,就能所见即所得的开发调试代码. sprint_r,print_r,lprint_r,err_log sprint_r 将 LUA 对象等格式化为易读的字符串返回 pri

  • 我正在尝试使用流数据帧将一个文件(csv.gz格式)转换为拼花地板。我必须使用流式数据帧,因为压缩的文件大小约为700 MB。作业是使用AWS EMR上的自定义jar运行的。源、目标和检查点位置都在AWS S3上。但一旦我尝试写入检查点,作业就会失败,并出现以下错误: 在EMR集群上运行的其他spark作业从S3读写并成功运行到S3(但它们不使用spark流)。所以我不认为这是S3文件系统访问的问