我是scala/sark世界的新手,最近开始了一项任务,它读取一些数据,处理数据并将其保存在S3上。我阅读了一些关于stackoverflow的主题/问题,这些主题/问题涉及重分区/合并性能和最佳分区数(如本例)。假设我有正确的分区数,我的问题是,在将rdd转换为数据帧时,对它进行重新分区是个好主意吗?下面是我的代码目前的样子:
val dataRdd = dataDf.rdd.repartition(partitions)
.map(x => ThreadedConcurrentContext.executeAsync(myFunction(x)))
.mapPartitions( it => ThreadedConcurrentContext.awaitSliding(it = it, batchSize = asyncThreadsPerTask, timeout = Duration(3600000, "millis")))
val finalDf = dataRdd
.filter(tpl => tpl._3 != "ERROR")
.toDF()
这是我打算做的(过滤后重新分区数据):
val finalDf = dataRdd
.filter(tpl => tpl._3 != "ERROR")
.repartition(partitions)
.toDF()
我的问题是,这样做是个好主意吗?这里有性能提升吗?
注1:过滤器通常会删除10%的原始数据。
注2:这是我用来运行上述代码的spark-submit命令的第一部分:
spark-submit --master yarn --deploy-mode client --num-executors 4 --executor-cores 4 --executor-memory 2G --driver-cores 4 --driver-memory 2G
问题的答案取决于dataRdd
的大小、分区数、执行器核
以及HDFS集群的处理能力。
考虑到这一点,您应该使用分区
的不同值在集群上运行一些测试,并完全删除repartition
,以便对其进行微调并找到准确的答案。
例如-如果您指定分区=8
和执行器-核心=4
,那么您将充分利用所有核心,但是如果您的dataRdd
的大小只有1GB,那么重新分区没有任何优势,因为它会触发shuffle
,从而影响性能。此外,如果您的HDFS集群的处理能力较低或负载较重,那么会因此产生额外的性能开销。
如果您的HDFS集群上有足够的可用资源,并且您有一个大的(比如超过100GB) dataRDD
,那么< code >重新分区应该有助于提高上面示例中配置值的性能。
我正在尝试将RDD[String]转换为数据框。字符串是逗号分隔的,所以我希望逗号之间的每个值都有一列。为此,我尝试了以下步骤: 但我明白了: 这不是这篇文章的副本(如何将rdd对象转换为火花中的数据帧),因为我要求RDD[字符串]而不是RDD[行]。 而且它也不是火花加载CSV文件作为DataFrame的副本?因为这个问题不是关于将CSV文件读取为DataFrame。
RDD是以数组[数组[字符串]的格式创建的,具有以下值: 我想用模式创建一个数据帧: 接下来的步骤: 给出以下错误:
我有一个如下的CSV文件。 我想把这个转化成下面。 基本上,我想在输出数据帧中创建一个名为idx的新列,该列将填充与键=idx,value=“n”后面的行相同的值“n”。
我正在尝试将RDD转换为数据帧,但失败并出现错误: org.apache.spark.SparkException:由于阶段失败而中止作业:阶段2.0中的任务0失败4次,最近一次失败:阶段2.0中丢失任务0.3(TID 11,10.139.64.5,执行器0) 这是我的代码:
我尝试使用以下代码获取数据帧的分区数量: 按照我的理解,dataframe通过元数据给rdd增加了一个结构层。那么,为什么在转换成rdd时要花这么多时间呢?
来自Spark源代码: https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L2972 可能需要与首先计算 所需的时间一样长。因此,这使得诸如 非常贵。假设< code>DataFrame是< code>DataSet[Row],而< code>