关于如何获得n<code>RDD</code>和/或<code>DataFrame</code>的分区数,有很多问题:答案总是:
rdd.getNumPartitions
或者
df.rdd.getNumPartitions
不幸的是,这在数据帧
上是一项代价高昂的操作,因为
df.rdd
需要从DataFrame
转换为rdd
。这是运行所需时间的顺序
df.count
我正在编写逻辑,根据当前分区数是否在可接受值范围内,或者是否低于或高于可接受值,可以选择<code>重新分区</code>或<code>合并</code>为<code>DataFrame</code>。
def repartition(inDf: DataFrame, minPartitions: Option[Int],
maxPartitions: Option[Int]): DataFrame = {
val inputPartitions= inDf.rdd.getNumPartitions // EXPENSIVE!
val outDf = minPartitions.flatMap{ minp =>
if (inputPartitions < minp) {
info(s"Repartition the input from $inputPartitions to $minp partitions..")
Option(inDf.repartition(minp))
} else {
None
}
}.getOrElse( maxPartitions.map{ maxp =>
if (inputPartitions > maxp) {
info(s"Coalesce the input from $inputPartitions to $maxp partitions..")
inDf.coalesce(maxp)
} else inDf
}.getOrElse(inDf))
outDf
}
但是,我们不能以这种方式为每个数据帧
承担rdd.getNum分区
的成本。
是否有任何方法可以获得此信息 - 例如,通过查询在线/临时目录中
的已注册
表?
更新Spark GUI显示DataFrame.rdd操作花费的时间与作业中最长的sql一样长。我将重新运行作业,并在这里附上截图。
下面只是一个测试用例:它使用的数据大小只是生产中数据大小的一小部分。最长的<code>sql</code>只有5分钟,而这一次也将花费这么多时间(请注意,<code>sql在这里没有帮助:它还必须随后执行,从而有效地将累计执行时间加倍)。
我们可以看到 DataFrameUtils
第 30 行的 .rdd
操作(如上面的代码片段所示)需要 5.1 分钟 - 但保存
操作在 5.2 分钟后仍然需要 - 即,就后续保存的执行时间而言,我们没有通过执行 .rdd
来节省
任何时间。
根据我的经验,df.rdd.getNum分区
非常快,我从来没有遇到过超过一秒钟左右。
或者,你也可以尝试
val numPartitions: Long = df
.select(org.apache.spark.sql.functions.spark_partition_id()).distinct().count()
这将避免使用< code >。rdd
rdd中的
,因为返回的rdd
组件没有固有成本。getNumPartitionsRDD
从不求值。
虽然您可以根据经验轻松确定这一点,但可以使用调试器(我将此作为读者的练习),或者确定在基本情况下不会触发任何作业
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.4.0
/_/
Using Scala version 2.11.12 (OpenJDK 64-Bit Server VM, Java 1.8.0_181)
Type in expressions to have them evaluated.
Type :help for more information.
scala> val ds = spark.read.text("README.md")
ds: org.apache.spark.sql.DataFrame = [value: string]
scala> ds.rdd.getNumPartitions
res0: Int = 1
scala> spark.sparkContext.statusTracker.getJobIdsForGroup(null).isEmpty // Check if there are any known jobs
res1: Boolean = true
这可能不足以说服你。所以让我们用一种更系统的方法来解决这个问题:
>
rdd
返回一个映射分区RDD
(如上所述的 ds
):
scala> ds.rdd.getClass
res2: Class[_ <: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]] = class org.apache.spark.rdd.MapPartitionsRDD
< code > rdd . getnumpartitions 调用< code>RDD.partitions。
在< code>rdd和源代码之间只有< code>MapPartitionsRDD。
scala> ds.rdd.toDebugString
res3: String =
(1) MapPartitionsRDD[3] at rdd at <console>:26 []
| MapPartitionsRDD[2] at rdd at <console>:26 []
| MapPartitionsRDD[1] at rdd at <console>:26 []
| FileScanRDD[0] at rdd at <console>:26 []
同样,如果Dataset
包含交换,我们将跟随父级进行最近的洗牌:
scala> ds.orderBy("value").rdd.toDebugString
res4: String =
(67) MapPartitionsRDD[13] at rdd at <console>:26 []
| MapPartitionsRDD[12] at rdd at <console>:26 []
| MapPartitionsRDD[11] at rdd at <console>:26 []
| ShuffledRowRDD[10] at rdd at <console>:26 []
+-(1) MapPartitionsRDD[9] at rdd at <console>:26 []
| MapPartitionsRDD[5] at rdd at <console>:26 []
| FileScanRDD[4] at rdd at <console>:26 []
请注意,这种情况特别有趣,因为我们实际上触发了一个作业:
scala> spark.sparkContext.statusTracker.getJobIdsForGroup(null).isEmpty
res5: Boolean = false
scala> spark.sparkContext.statusTracker.getJobIdsForGroup(null)
res6: Array[Int] = Array(0)
这是因为我们遇到过无法静态确定分区的情况(请参阅排序后的数据帧分区数和为什么 sortBy 转换会触发 Spark 作业?)。
在这种情况下,< code>getNumPartitions也会触发一个作业:
scala> ds.orderBy("value").rdd.getNumPartitions
res7: Int = 67
scala> spark.sparkContext.statusTracker.getJobIdsForGroup(null) // Note new job id
res8: Array[Int] = Array(1, 0)
然而,这并不意味着观察到的成本在某种程度上与. rdd
调用有关。相反,它是查找分区
的内在成本,以防没有静态公式(例如,某些Hadoop输入格式,需要对数据进行完全扫描)。
请注意,此处提出的观点不应推广到< code>Dataset.rdd的其他应用。例如,< code>ds.rdd.count确实是昂贵且浪费的。
有人能解释一下将为Spark Dataframe创建的分区数量吗。 我知道对于RDD,在创建它时,我们可以提到如下分区的数量。 但是对于创建时的Spark数据帧,看起来我们没有像RDD那样指定分区数量的选项。 我认为唯一的可能性是,在创建数据帧后,我们可以使用重新分区API。 有人能告诉我在创建数据帧时,我们是否可以指定分区的数量。
我需要使用 spark-sql 加载一个 Hive 表,然后对其运行一些机器学习算法。我是这样写的: 它工作得很好,但如果我想增加数据集数据帧的分区数,我该怎么做?使用普通RDD,我可以写: 我想要有N个分区。 谢谢
我们有没有可能在Spark中先按一列分区,然后再按另一列聚类? 在我的例子中,我在一个有数百万行的表中有一个< code>month列和一个< code>cust_id列。我可以说,当我将数据帧保存到hive表中,以便根据月份将该表分区,并按< code>cust_id将该表聚类成50个文件吗? 忽略按< code>cust_id的聚类,这里有三个不同的选项 第一种情况和最后一种情况在 Spark
我一直在使用SE上发布的问题的一个极好的答案来确定分区的数量,以及跨数据帧的分区分布需要知道数据帧Spark中的分区详细信息 有人能帮我扩展答案来确定数据帧的分区大小吗? 谢谢
我正在查看代码中的一个错误,其中一个数据框被分成了太多的分区(超过700个),当我试图将它们重新分区为48个时,这会导致太多的洗牌操作。我不能在这里使用coalesce(),因为我想在重新分区之前首先拥有更少的分区。 我正在寻找减少分区数量的方法。假设我有一个 spark 数据帧(具有多个列),分为 10 个分区。我需要根据其中一列进行 orderBy 转换。完成此操作后,生成的数据帧是否具有相同
当我使用Spark从S3读取多个文件时(例如,一个包含许多Parquet文件的目录)- 逻辑分区是在开始时发生,然后每个执行器直接下载数据(在worker节点上)吗?< br >还是驱动程序下载数据(部分或全部),然后进行分区并将数据发送给执行器? 此外,分区是否默认为用于写入的相同分区(即每个文件= 1个分区)?