当前位置: 首页 > 知识库问答 >
问题:

排序后的数据帧分区数?

壤驷向明
2023-03-14

spark如何在使用< code>orderBy后确定分区的数量?我一直以为生成的数据帧有< code > spark . SQL . shuffle . partitions ,但这似乎不是真的:

val df = (1 to 10000).map(i => ("a",i)).toDF("n","i").repartition(10).cache

df.orderBy($"i").rdd.getNumPartitions // = 200 (=spark.sql.shuffle.partitions)
df.orderBy($"n").rdd.getNumPartitions // = 2 

在这两种情况下,spark都< code >-Exchange range partitioning(I/n ASC NULLS FIRST,200),那么第二种情况下的分区数怎么会是2呢?

共有2个答案

罗伟兆
2023-03-14

我进行了各种测试,以便从经验上更仔细地看待这个问题,除了查看排序的范围分区 - 这是这里问题的症结所在。请参阅范围分区程序如何在 Spark 中工作?。

在尝试了“n”的 1 个不同值(如问题中的示例)和 1 个以上的“n”不同值之后,然后使用 df.orderBy($“n”) 的各种数据帧大小:

  • 很明显,用于确定分区数的计算将包含随后通过map分区进行排序的数据范围,
  • 它基于在计算这些计算范围的一些启发式最佳分区数之前从现有分区中采样,
  • 在大多数情况下,将计算并生成 N 1 个分区,从而将 N 1 分区为空。

分配的额外分区几乎总是空的这一事实使我认为编码中存在某种计算错误,换句话说,恕我直言,这是一个小错误。

我基于以下简单测试,该测试确实返回了我怀疑会认为是正确的分区数的RR:

val df_a1 = (1 to 1).map(i => ("a",i)).toDF("n","i").cache
val df_a2 = (1 to 1).map(i => ("b",i)).toDF("n","i").cache
val df_a3 = (1 to 1).map(i => ("c",i)).toDF("n","i").cache
val df_b = df_a1.union(df_a2)
val df_c = df_b.union(df_a3)

df_c.orderBy($"n")
 .rdd
 .mapPartitionsWithIndex{case (i,rows) => Iterator((i,rows.size))}
 .toDF("partition_number","number_of_records")
 .show(100,false)

退货:

+----------------+-----------------+
|partition_number|number_of_records|
+----------------+-----------------+
|0               |1                |
|1               |1                |
|2               |1                |
+----------------+-----------------+

这个边界示例计算相当简单。只要我对任何一个“N”使用1到2或1…N,就会产生额外的空分区:

+----------------+-----------------+
|partition_number|number_of_records|
+----------------+-----------------+
|0               |2                |
|1               |1                |
|2               |1                |
|3               |0                |
+----------------+-----------------+

排序要求给定“n”或一组“n”的所有数据位于同一分区中。

沈飞翼
2023-03-14

< code > spark . SQL . shuffle . partitions 用作上限。分区的最终数量是< code>1

正如您所提到的,Spark中的排序经过<code>RangePartitioner</code>。它试图实现的是将数据集划分为大致相等范围的指定数量(spark.sql.shuffle.partition)。

可以保证分区后相同的值会在同一个分区中。值得检查Range分区(不是公共API的一部分)类留档:

...

< code>ordering中的表达式计算出相同值的所有行将位于同一分区中

如果不同排序值的数量小于所需的分区数量,即可能范围的数量小于<code>spark.sql.shuffle。分区,您将得到较少的分区数。此外,这里引用了<code>RangePartitioner</code>Scaladoc的一句话:

在采样记录数小于分区值的情况下,范围分区程序创建的实际分区数可能与 partitions 参数不同。

回到您的示例,n 是一个常量 (“a”),无法进行分区。另一方面,我可以有10,000个可能的值,并被分区为200(=spark.sql.shuffle.partition)范围或分区。

请注意,这仅适用于DataFrame/Dataset API。使用RDD的sortByKey时,可以显式指定分区数,也可以使用Spark当前的分区数。

另请参阅:

  • Spark如何实现排序顺序?
 类似资料:
  • 有人能解释一下将为Spark Dataframe创建的分区数量吗。 我知道对于RDD,在创建它时,我们可以提到如下分区的数量。 但是对于创建时的Spark数据帧,看起来我们没有像RDD那样指定分区数量的选项。 我认为唯一的可能性是,在创建数据帧后,我们可以使用重新分区API。 有人能告诉我在创建数据帧时,我们是否可以指定分区的数量。

  • 我正在查看代码中的一个错误,其中一个数据框被分成了太多的分区(超过700个),当我试图将它们重新分区为48个时,这会导致太多的洗牌操作。我不能在这里使用coalesce(),因为我想在重新分区之前首先拥有更少的分区。 我正在寻找减少分区数量的方法。假设我有一个 spark 数据帧(具有多个列),分为 10 个分区。我需要根据其中一列进行 orderBy 转换。完成此操作后,生成的数据帧是否具有相同

  • 我需要使用 spark-sql 加载一个 Hive 表,然后对其运行一些机器学习算法。我是这样写的: 它工作得很好,但如果我想增加数据集数据帧的分区数,我该怎么做?使用普通RDD,我可以写: 我想要有N个分区。 谢谢

  • [新加入Spark]语言-Scala 根据文档,RangePartitioner对元素进行排序并将其划分为块,然后将块分发到不同的机器。下面的例子说明了它是如何工作的。 假设我们有一个数据框,有两列,一列(比如“a”)的连续值从1到1000。还有另一个数据帧具有相同的模式,但对应的列只有4个值30、250、500、900。(可以是任意值,从1到1000中随机选择) 如果我使用RangePartit

  • 我们有没有可能在Spark中先按一列分区,然后再按另一列聚类? 在我的例子中,我在一个有数百万行的表中有一个< code>month列和一个< code>cust_id列。我可以说,当我将数据帧保存到hive表中,以便根据月份将该表分区,并按< code>cust_id将该表聚类成50个文件吗? 忽略按< code>cust_id的聚类,这里有三个不同的选项 第一种情况和最后一种情况在 Spark

  • 最近,我一直在开发一些代码来读取csv文件并将关键数据列存储在数据框中。之后,我计划对数据帧中的某些列执行一些数学函数。 我已经相当成功地在数据框中存储了正确的列。我已经能够让它做任何数学是必要的,如求和,添加数据框列,平均等。 我的问题在于,一旦特定列存储在数据帧中,就要访问它们。我正在使用一个测试文件,以使一切正常工作,并没有问题地处理了这个问题。当我打开一个不同的csv文件时会出现问题,它会