当前位置: 首页 > 知识库问答 >
问题:

如何检查Spark数据帧的分区数而不产生?放射性散布装置

方和宜
2023-03-14

关于如何获得n<code>RDD</code>和/或<code>DataFrame</code>的分区数,有很多问题:答案总是:

 rdd.getNumPartitions

或者

 df.rdd.getNumPartitions

不幸的是,这在数据帧上是一项代价高昂的操作,因为

 df.rdd

需要从DataFrame转换为rdd。这是运行所需时间的顺序

 df.count

我正在编写逻辑,根据当前分区数是否在可接受值范围内,或者是否低于或高于可接受值,可以选择<code>重新分区</code>或<code>合并</code>为<code>DataFrame</code>。

  def repartition(inDf: DataFrame, minPartitions: Option[Int],
       maxPartitions: Option[Int]): DataFrame = {
    val inputPartitions= inDf.rdd.getNumPartitions  // EXPENSIVE!
    val outDf = minPartitions.flatMap{ minp =>
      if (inputPartitions < minp) {
        info(s"Repartition the input from $inputPartitions to $minp partitions..")
        Option(inDf.repartition(minp))
      } else {
        None
      }
    }.getOrElse( maxPartitions.map{ maxp =>
      if (inputPartitions > maxp) {
        info(s"Coalesce the input from $inputPartitions to $maxp partitions..")
        inDf.coalesce(maxp)
      } else inDf
    }.getOrElse(inDf))
    outDf
  }

但是,我们不能以这种方式为每个数据帧承担rdd.getNum分区的成本。

是否有任何方法可以获得此信息 - 例如,通过查询在线/临时目录中已注册表?

更新Spark GUI显示DataFrame.rdd操作花费的时间与作业中最长的sql一样长。我将重新运行作业,并在这里附上截图。

下面只是一个测试用例:它使用的数据大小只是生产中数据大小的一小部分。最长的<code>sql</code>只有5分钟,而这一次也将花费这么多时间(请注意,<code>sql在这里没有帮助:它还必须随后执行,从而有效地将累计执行时间加倍)。

我们可以看到 DataFrameUtils 第 30 行的 .rdd 操作(如上面的代码片段所示)需要 5.1 分钟 - 但保存操作在 5.2 分钟后仍然需要 - 即,就后续保存的执行时间而言,我们没有通过执行 .rdd节省任何时间。

共有2个答案

田阳泽
2023-03-14

根据我的经验,df.rdd.getNum分区非常快,我从来没有遇到过超过一秒钟左右。

或者,你也可以尝试

val numPartitions: Long = df
      .select(org.apache.spark.sql.functions.spark_partition_id()).distinct().count()

这将避免使用< code >。rdd

须旭
2023-03-14

rdd中的rdd组件没有固有成本。getNumPartitions,因为返回的RDD从不求值。

虽然您可以根据经验轻松确定这一点,但可以使用调试器(我将此作为读者的练习),或者确定在基本情况下不会触发任何作业

Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.4.0
      /_/

Using Scala version 2.11.12 (OpenJDK 64-Bit Server VM, Java 1.8.0_181)
Type in expressions to have them evaluated.
Type :help for more information.
scala> val ds = spark.read.text("README.md")
ds: org.apache.spark.sql.DataFrame = [value: string]

scala> ds.rdd.getNumPartitions
res0: Int = 1

scala> spark.sparkContext.statusTracker.getJobIdsForGroup(null).isEmpty // Check if there are any known jobs
res1: Boolean = true

这可能不足以说服你。所以让我们用一种更系统的方法来解决这个问题:

>

  • rdd 返回一个映射分区RDD(如上所述的 ds):

    scala> ds.rdd.getClass
    res2: Class[_ <: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]] = class org.apache.spark.rdd.MapPartitionsRDD
    

    < code > rdd . getnumpartitions 调用< code>RDD.partitions。

    在< code>rdd和源代码之间只有< code>MapPartitionsRDD。

    scala> ds.rdd.toDebugString
    res3: String =
    (1) MapPartitionsRDD[3] at rdd at <console>:26 []
     |  MapPartitionsRDD[2] at rdd at <console>:26 []
     |  MapPartitionsRDD[1] at rdd at <console>:26 []
     |  FileScanRDD[0] at rdd at <console>:26 []
    

    同样,如果Dataset包含交换,我们将跟随父级进行最近的洗牌:

    scala> ds.orderBy("value").rdd.toDebugString
    res4: String =
    (67) MapPartitionsRDD[13] at rdd at <console>:26 []
     |   MapPartitionsRDD[12] at rdd at <console>:26 []
     |   MapPartitionsRDD[11] at rdd at <console>:26 []
     |   ShuffledRowRDD[10] at rdd at <console>:26 []
     +-(1) MapPartitionsRDD[9] at rdd at <console>:26 []
        |  MapPartitionsRDD[5] at rdd at <console>:26 []
        |  FileScanRDD[4] at rdd at <console>:26 []
    

    请注意,这种情况特别有趣,因为我们实际上触发了一个作业:

    scala> spark.sparkContext.statusTracker.getJobIdsForGroup(null).isEmpty
    res5: Boolean = false
    
    scala> spark.sparkContext.statusTracker.getJobIdsForGroup(null)
    res6: Array[Int] = Array(0)
    

    这是因为我们遇到过无法静态确定分区的情况(请参阅排序后的数据帧分区数和为什么 sortBy 转换会触发 Spark 作业?)。

    在这种情况下,< code>getNumPartitions也会触发一个作业:

    scala> ds.orderBy("value").rdd.getNumPartitions
    res7: Int = 67
    
    scala> spark.sparkContext.statusTracker.getJobIdsForGroup(null)  // Note new job id
    res8: Array[Int] = Array(1, 0)
    

    然而,这并不意味着观察到的成本在某种程度上与. rdd调用有关。相反,它是查找分区的内在成本,以防没有静态公式(例如,某些Hadoop输入格式,需要对数据进行完全扫描)。

    请注意,此处提出的观点不应推广到< code>Dataset.rdd的其他应用。例如,< code>ds.rdd.count确实是昂贵且浪费的。

  •  类似资料:
    • 有人能解释一下将为Spark Dataframe创建的分区数量吗。 我知道对于RDD,在创建它时,我们可以提到如下分区的数量。 但是对于创建时的Spark数据帧,看起来我们没有像RDD那样指定分区数量的选项。 我认为唯一的可能性是,在创建数据帧后,我们可以使用重新分区API。 有人能告诉我在创建数据帧时,我们是否可以指定分区的数量。

    • 我需要使用 spark-sql 加载一个 Hive 表,然后对其运行一些机器学习算法。我是这样写的: 它工作得很好,但如果我想增加数据集数据帧的分区数,我该怎么做?使用普通RDD,我可以写: 我想要有N个分区。 谢谢

    • 我们有没有可能在Spark中先按一列分区,然后再按另一列聚类? 在我的例子中,我在一个有数百万行的表中有一个< code>month列和一个< code>cust_id列。我可以说,当我将数据帧保存到hive表中,以便根据月份将该表分区,并按< code>cust_id将该表聚类成50个文件吗? 忽略按< code>cust_id的聚类,这里有三个不同的选项 第一种情况和最后一种情况在 Spark

    • 我一直在使用SE上发布的问题的一个极好的答案来确定分区的数量,以及跨数据帧的分区分布需要知道数据帧Spark中的分区详细信息 有人能帮我扩展答案来确定数据帧的分区大小吗? 谢谢

    • 我正在查看代码中的一个错误,其中一个数据框被分成了太多的分区(超过700个),当我试图将它们重新分区为48个时,这会导致太多的洗牌操作。我不能在这里使用coalesce(),因为我想在重新分区之前首先拥有更少的分区。 我正在寻找减少分区数量的方法。假设我有一个 spark 数据帧(具有多个列),分为 10 个分区。我需要根据其中一列进行 orderBy 转换。完成此操作后,生成的数据帧是否具有相同

    • 当我使用Spark从S3读取多个文件时(例如,一个包含许多Parquet文件的目录)- 逻辑分区是在开始时发生,然后每个执行器直接下载数据(在worker节点上)吗?< br >还是驱动程序下载数据(部分或全部),然后进行分区并将数据发送给执行器? 此外,分区是否默认为用于写入的相同分区(即每个文件= 1个分区)?