当前位置: 首页 > 知识库问答 >
问题:

Spark:重新分区和重新分区ByRange有什么区别?

巫朝明
2023-03-14

我在这里浏览了文档:https://spark . Apache . org/docs/latest/API/python/py spark . SQL . html

它说:

  • 重新分区:生成的DataFrame是哈希分区的
  • 对于repartitionByRange:结果DataFrame是范围分区的

而且之前的一个问题也提到了。然而,我仍然不明白它们到底有什么不同,当选择一个而不是另一个时会有什么影响?

更重要的是,如果重新分区进行散列分区,那么提供列作为其参数会产生什么影响?

共有2个答案

端木渝
2023-03-14

通过使用 df.explain ,您可以获得有关这些操作的大量信息。

我用这个数据框作为例子:

df = spark.createDataFrame([(i, f"value {i}") for i in range(1, 22, 1)], ["id", "value"])

根据是否指定了键表达式(列),分区方法会有所不同。并不总是像你说的那样是散列分区。

df.repartition(3).explain(True)

== Parsed Logical Plan ==
Repartition 3, true
+- LogicalRDD [id#0L, value#1], false

== Analyzed Logical Plan ==
id: bigint, value: string
Repartition 3, true
+- LogicalRDD [id#0L, value#1], false

== Optimized Logical Plan ==
Repartition 3, true
+- LogicalRDD [id#0L, value#1], false

== Physical Plan ==
Exchange RoundRobinPartitioning(3)
+- Scan ExistingRDD[id#0L,value#1]

我们可以在生成的物理计划中看到使用了RoundRobin分区

表示一种分区,其中通过从随机目标分区号开始并以循环方式分布行,将行均匀分布在输出分区中。实现DataFrame.repartition()运算符时会使用这种分区。

使用按列表达式重新分区时:

df.repartition(3, "id").explain(True)

== Parsed Logical Plan ==
'RepartitionByExpression ['id], 3
+- LogicalRDD [id#0L, value#1], false

== Analyzed Logical Plan ==
id: bigint, value: string
RepartitionByExpression [id#0L], 3
+- LogicalRDD [id#0L, value#1], false

== Optimized Logical Plan ==
RepartitionByExpression [id#0L], 3
+- LogicalRDD [id#0L, value#1], false

== Physical Plan ==
Exchange hashpartitioning(id#0L, 3)
+- Scan ExistingRDD[id#0L,value#1]

现在选择的分区方法是hashpartitioning。在散列分区方法中,Java<code>Object。正在为每个键表达式计算hashCode,以通过计算modulo:键来确定目标partition_id。hashCode%numPartitions

此分区方法根据分区键创建连续且不重叠的值范围。因此,至少需要一个键表达式,并且需要可排序。

df.repartitionByRange(3, "id").explain(True)

== Parsed Logical Plan ==
'RepartitionByExpression ['id ASC NULLS FIRST], 3
+- LogicalRDD [id#0L, value#1], false

== Analyzed Logical Plan ==
id: bigint, value: string
RepartitionByExpression [id#0L ASC NULLS FIRST], 3
+- LogicalRDD [id#0L, value#1], false

== Optimized Logical Plan ==
RepartitionByExpression [id#0L ASC NULLS FIRST], 3
+- LogicalRDD [id#0L, value#1], false

== Physical Plan ==
Exchange rangepartitioning(id#0L ASC NULLS FIRST, 3)
+- Scan ExistingRDD[id#0L,value#1]

查看生成的物理计划,我们可以看到范围分区与上述其他两个分区的不同之处在于分区表达式中存在排序子句。当表达式中未指定显式排序顺序时,默认情况下使用升序。

  • 重新分区逻辑运算符-重新分区和重新分区By表达式
  • Apache SparkSQL中的范围分区
  • 哈希与范围分区
岳京
2023-03-14

我认为最好通过一些实验来研究这种差异。

对于这个实验,我使用了以下两个Dataframes(我在Scala中显示代码,但概念与Python API相同):

// Dataframe with one column "value" containing the values ranging from 0 to 1000000
val df = Seq(0 to 1000000: _*).toDF("value")

// Dataframe with one column "value" containing 1000000 the number 0 in addition to the numbers 5000, 10000 and 100000
val df2 = Seq((0 to 1000000).map(_ => 0) :+ 5000 :+ 10000 :+ 100000: _*).toDF("value")

>

  • 重新分区在提供了一列或多列时应用哈希分区器,在没有提供列时应用RoundRobin分区器。如果提供了一列或多列(哈希分区器),这些值将被散列并用于通过计算类似分区=哈希(列)%NumberOftions来确定分区号。如果没有提供列(RoundRobin分区器),数据将均匀分布在指定数量的分区上。

    重新分区ByRange将根据列值的范围对数据进行分区。这通常用于连续(而非离散)值,例如任何类型的数字。请注意,由于性能原因,此方法使用采样来估计范围。因此,输出可能不一致,因为采样可能返回不同的值。样本大小可以通过配置spark.sql.execution.rangeExchange.sampleSizePer的分区来控制。

    还值得一提的是,对于这两种方法,如果未给出 numPartitions,默认情况下,它会将数据帧数据分区为 Spark 会话中配置的 spark.sql.shuffle.partitions,并且可以通过自适应查询执行(从 Spark 3.x 开始可用)合并。

    根据给定的Testdata,我总是应用相同的代码:

    val testDf = df
    // here I will insert the partition logic
        .withColumn("partition", spark_partition_id()) // applying SQL built-in function to determine actual partition
        .groupBy(col("partition"))
        .agg(
          count(col("value")).as("count"),
          min(col("value")).as("min_value"),
          max(col("value")).as("max_value"))
        .orderBy(col("partition"))
    
    testDf.show(false)
    

    正如预期的那样,我们得到了4个分区,因为< code>df的值的范围是从0到1000000,我们看到它们的散列值将产生分布良好的数据帧。

    +---------+------+---------+---------+
    |partition|count |min_value|max_value|
    +---------+------+---------+---------+
    |0        |249911|12       |1000000  |
    |1        |250076|6        |999994   |
    |2        |250334|2        |999999   |
    |3        |249680|0        |999998   |
    +---------+------+---------+---------+
    

    同样在这种情况下,我们得到4个分区,但这次最小值和最大值清楚地显示了一个分区内的值范围。它几乎平均分布,每个分区有250000个值。

    +---------+------+---------+---------+
    |partition|count |min_value|max_value|
    +---------+------+---------+---------+
    |0        |244803|0        |244802   |
    |1        |255376|244803   |500178   |
    |2        |249777|500179   |749955   |
    |3        |250045|749956   |1000000  |
    +---------+------+---------+---------+
    

    现在,我们正在使用另一个Dataframedf2。在这里,散列算法正在散列只有0、5000、10000或100000的值。当然,值0的散列将始终相同,因此所有Zeros最终都在同一个分区中(在本例中为分区3)。其他两个分区只包含一个值。

    +---------+-------+---------+---------+
    |partition|count  |min_value|max_value|
    +---------+-------+---------+---------+
    |0        |1      |100000   |100000   |
    |1        |1      |10000    |10000    |
    |2        |1      |5000     |5000     |
    |3        |1000001|0        |0        |
    +---------+-------+---------+---------+
    

    如果不使用列“value”的内容,重新分区方法将在RoundRobin的基础上分发消息。所有分区的数据量几乎相同。

    +---------+------+---------+---------+
    |partition|count |min_value|max_value|
    +---------+------+---------+---------+
    |0        |250002|0        |5000     |
    |1        |250002|0        |10000    |
    |2        |249998|0        |100000   |
    |3        |250002|0        |0        |
    +---------+------+---------+---------+
    

    这个案例表明,Dataframedf2没有很好地定义用于按范围重新分区,因为几乎所有值都是0。因此,我们最终只有两个分区,而分区0包含所有零。

    +---------+-------+---------+---------+
    |partition|count  |min_value|max_value|
    +---------+-------+---------+---------+
    |0        |1000001|0        |0        |
    |1        |3      |5000     |100000   |
    +---------+-------+---------+---------+
    

  •  类似资料:
    • 根据Spark 1.6.3的文档,应该保留结果数据表中的分区数: 返回由给定分区表达式分区的新DataFrame,保留现有的分区数 Edit:这个问题并不涉及在Apache Spark中删除空DataFrame分区的问题(例如,如何在不产生空分区的情况下沿列重新分区),而是为什么文档所说的内容与我在示例中观察到的内容不同

    • 所以问题是在主题中。我认为我没有正确理解重新分区的工作。在我的脑海中,当我说时,我希望所有数据都将在工作人员(假设60个工作人员)之间按相等的大小进行分区。 举个例子。我会在不平衡的文件中加载大量数据,比如400个文件,其中20%的文件大小为2Gb,其他80%的文件大小约为1Mb。我有加载此数据的代码: 然后,我希望将原始数据转换为中间对象,过滤不相关的记录,转换为最终对象(带有附加属性),然后按

    • 假设我有一个1.2 GB的文件,那么考虑到128 MB的块大小,它将创建10个分区。现在,如果我将其重新分区(或合并)为4个分区,这意味着每个分区肯定会超过128 MB。在这种情况下,每个分区必须容纳320 MB的数据,但块大小是128 MB。我有点糊涂了。这怎么可能?我们如何创建一个大于块大小的分区?

    • 我试图优化两个spark dataframes之间的联接查询,让我们将它们称为df1、df2(在公共列“saleid”上联接)。df1非常小(5M),所以我在spark集群的节点中广播它。df2非常大(200米行),所以我尝试通过“saleid”对它进行桶/重新分区。 例如: 分区: 水桶: 我不知道哪一个是正确的技术使用。谢谢。

    • 我有一个大约 100GB 的数据源,我正在尝试使用日期列对其进行分区。 为了避免分区内出现小块,我添加了一个重新分区(5 ),使每个分区内最多有5个文件: 我的问题是,在我分配的30个执行器中,只有5个在实际运行。最后我得到了我想要的东西(每个分区内有5个文件),但由于只有5个执行器在运行,所以执行时间非常长。 你有什么建议可以让我做得更快吗?

    • 我是Spark的新手,有一个1 TB的文件需要处理。 我的系统规格是: 每个节点:64 GB RAM 节点数:2 每个节点的核心:5 正如我所知,我必须重新分区数据以获得更好的并行性,因为火花将尝试仅通过(核心总数*2或3或4)创建默认分区。但在我的情况下,由于数据文件非常大,我必须将这些数据重新分区为一个数字,以便这些数据可以以有效的方式处理。 如何选择要在重新分区中传递的分区数??我应该如何计