我在这里浏览了文档:https://spark . Apache . org/docs/latest/API/python/py spark . SQL . html
它说:
而且之前的一个问题也提到了。然而,我仍然不明白它们到底有什么不同,当选择一个而不是另一个时会有什么影响?
更重要的是,如果重新分区进行散列分区,那么提供列作为其参数会产生什么影响?
通过使用 df.explain
,您可以获得有关这些操作的大量信息。
我用这个数据框作为例子:
df = spark.createDataFrame([(i, f"value {i}") for i in range(1, 22, 1)], ["id", "value"])
根据是否指定了键表达式(列),分区方法会有所不同。并不总是像你说的那样是散列分区。
df.repartition(3).explain(True)
== Parsed Logical Plan ==
Repartition 3, true
+- LogicalRDD [id#0L, value#1], false
== Analyzed Logical Plan ==
id: bigint, value: string
Repartition 3, true
+- LogicalRDD [id#0L, value#1], false
== Optimized Logical Plan ==
Repartition 3, true
+- LogicalRDD [id#0L, value#1], false
== Physical Plan ==
Exchange RoundRobinPartitioning(3)
+- Scan ExistingRDD[id#0L,value#1]
我们可以在生成的物理计划中看到使用了RoundRobin分区
:
表示一种分区,其中通过从随机目标分区号开始并以循环方式分布行,将行均匀分布在输出分区中。实现DataFrame.repartition()运算符时会使用这种分区。
使用按列表达式重新分区时:
df.repartition(3, "id").explain(True)
== Parsed Logical Plan ==
'RepartitionByExpression ['id], 3
+- LogicalRDD [id#0L, value#1], false
== Analyzed Logical Plan ==
id: bigint, value: string
RepartitionByExpression [id#0L], 3
+- LogicalRDD [id#0L, value#1], false
== Optimized Logical Plan ==
RepartitionByExpression [id#0L], 3
+- LogicalRDD [id#0L, value#1], false
== Physical Plan ==
Exchange hashpartitioning(id#0L, 3)
+- Scan ExistingRDD[id#0L,value#1]
现在选择的分区方法是hashpartitioning
。在散列分区方法中,Java<code>Object。正在为每个键表达式计算hashCode,以通过计算modulo:键来确定目标
。partition_id
。hashCode%numPartitions
此分区方法根据分区
键创建连续且不重叠的值范围。因此,至少需要一个键表达式,并且需要可排序。
df.repartitionByRange(3, "id").explain(True)
== Parsed Logical Plan ==
'RepartitionByExpression ['id ASC NULLS FIRST], 3
+- LogicalRDD [id#0L, value#1], false
== Analyzed Logical Plan ==
id: bigint, value: string
RepartitionByExpression [id#0L ASC NULLS FIRST], 3
+- LogicalRDD [id#0L, value#1], false
== Optimized Logical Plan ==
RepartitionByExpression [id#0L ASC NULLS FIRST], 3
+- LogicalRDD [id#0L, value#1], false
== Physical Plan ==
Exchange rangepartitioning(id#0L ASC NULLS FIRST, 3)
+- Scan ExistingRDD[id#0L,value#1]
查看生成的物理计划,我们可以看到范围分区与上述其他两个分区的不同之处在于分区
表达式中存在排序子句。当表达式中未指定显式排序顺序时,默认情况下使用升序。
我认为最好通过一些实验来研究这种差异。
对于这个实验,我使用了以下两个Dataframes(我在Scala中显示代码,但概念与Python API相同):
// Dataframe with one column "value" containing the values ranging from 0 to 1000000
val df = Seq(0 to 1000000: _*).toDF("value")
// Dataframe with one column "value" containing 1000000 the number 0 in addition to the numbers 5000, 10000 and 100000
val df2 = Seq((0 to 1000000).map(_ => 0) :+ 5000 :+ 10000 :+ 100000: _*).toDF("value")
>
重新分区
在提供了一列或多列时应用哈希分区器
,在没有提供列时应用RoundRobin分区器
。如果提供了一列或多列(哈希分区器),这些值将被散列并用于通过计算类似分区=哈希(列)%NumberOftions
来确定分区号。如果没有提供列(RoundRobin分区器),数据将均匀分布在指定数量的分区上。
重新分区ByRange
将根据列值的范围对数据进行分区。这通常用于连续(而非离散)值,例如任何类型的数字。请注意,由于性能原因,此方法使用采样来估计范围。因此,输出可能不一致,因为采样可能返回不同的值。样本大小可以通过配置spark.sql.execution.rangeExchange.sampleSizePer的分区
来控制。
还值得一提的是,对于这两种方法,如果未给出 numPartitions
,默认情况下,它会将数据帧数据分区为 Spark 会话中配置的 spark.sql.shuffle.partitions
,并且可以通过自适应查询执行(从 Spark 3.x 开始可用)合并。
根据给定的Testdata,我总是应用相同的代码:
val testDf = df
// here I will insert the partition logic
.withColumn("partition", spark_partition_id()) // applying SQL built-in function to determine actual partition
.groupBy(col("partition"))
.agg(
count(col("value")).as("count"),
min(col("value")).as("min_value"),
max(col("value")).as("max_value"))
.orderBy(col("partition"))
testDf.show(false)
正如预期的那样,我们得到了4个分区,因为< code>df的值的范围是从0到1000000,我们看到它们的散列值将产生分布良好的数据帧。
+---------+------+---------+---------+
|partition|count |min_value|max_value|
+---------+------+---------+---------+
|0 |249911|12 |1000000 |
|1 |250076|6 |999994 |
|2 |250334|2 |999999 |
|3 |249680|0 |999998 |
+---------+------+---------+---------+
同样在这种情况下,我们得到4个分区,但这次最小值和最大值清楚地显示了一个分区内的值范围。它几乎平均分布,每个分区有250000个值。
+---------+------+---------+---------+
|partition|count |min_value|max_value|
+---------+------+---------+---------+
|0 |244803|0 |244802 |
|1 |255376|244803 |500178 |
|2 |249777|500179 |749955 |
|3 |250045|749956 |1000000 |
+---------+------+---------+---------+
现在,我们正在使用另一个Dataframedf2
。在这里,散列算法正在散列只有0、5000、10000或100000的值。当然,值0的散列将始终相同,因此所有Zeros最终都在同一个分区中(在本例中为分区3)。其他两个分区只包含一个值。
+---------+-------+---------+---------+
|partition|count |min_value|max_value|
+---------+-------+---------+---------+
|0 |1 |100000 |100000 |
|1 |1 |10000 |10000 |
|2 |1 |5000 |5000 |
|3 |1000001|0 |0 |
+---------+-------+---------+---------+
如果不使用列“value”的内容,重新分区
方法将在RoundRobin的基础上分发消息。所有分区的数据量几乎相同。
+---------+------+---------+---------+
|partition|count |min_value|max_value|
+---------+------+---------+---------+
|0 |250002|0 |5000 |
|1 |250002|0 |10000 |
|2 |249998|0 |100000 |
|3 |250002|0 |0 |
+---------+------+---------+---------+
这个案例表明,Dataframedf2
没有很好地定义用于按范围重新分区,因为几乎所有值都是0。因此,我们最终只有两个分区,而分区0包含所有零。
+---------+-------+---------+---------+
|partition|count |min_value|max_value|
+---------+-------+---------+---------+
|0 |1000001|0 |0 |
|1 |3 |5000 |100000 |
+---------+-------+---------+---------+
根据Spark 1.6.3的文档,应该保留结果数据表中的分区数: 返回由给定分区表达式分区的新DataFrame,保留现有的分区数 Edit:这个问题并不涉及在Apache Spark中删除空DataFrame分区的问题(例如,如何在不产生空分区的情况下沿列重新分区),而是为什么文档所说的内容与我在示例中观察到的内容不同
所以问题是在主题中。我认为我没有正确理解重新分区的工作。在我的脑海中,当我说时,我希望所有数据都将在工作人员(假设60个工作人员)之间按相等的大小进行分区。 举个例子。我会在不平衡的文件中加载大量数据,比如400个文件,其中20%的文件大小为2Gb,其他80%的文件大小约为1Mb。我有加载此数据的代码: 然后,我希望将原始数据转换为中间对象,过滤不相关的记录,转换为最终对象(带有附加属性),然后按
假设我有一个1.2 GB的文件,那么考虑到128 MB的块大小,它将创建10个分区。现在,如果我将其重新分区(或合并)为4个分区,这意味着每个分区肯定会超过128 MB。在这种情况下,每个分区必须容纳320 MB的数据,但块大小是128 MB。我有点糊涂了。这怎么可能?我们如何创建一个大于块大小的分区?
我试图优化两个spark dataframes之间的联接查询,让我们将它们称为df1、df2(在公共列“saleid”上联接)。df1非常小(5M),所以我在spark集群的节点中广播它。df2非常大(200米行),所以我尝试通过“saleid”对它进行桶/重新分区。 例如: 分区: 水桶: 我不知道哪一个是正确的技术使用。谢谢。
我有一个大约 100GB 的数据源,我正在尝试使用日期列对其进行分区。 为了避免分区内出现小块,我添加了一个重新分区(5 ),使每个分区内最多有5个文件: 我的问题是,在我分配的30个执行器中,只有5个在实际运行。最后我得到了我想要的东西(每个分区内有5个文件),但由于只有5个执行器在运行,所以执行时间非常长。 你有什么建议可以让我做得更快吗?
我是Spark的新手,有一个1 TB的文件需要处理。 我的系统规格是: 每个节点:64 GB RAM 节点数:2 每个节点的核心:5 正如我所知,我必须重新分区数据以获得更好的并行性,因为火花将尝试仅通过(核心总数*2或3或4)创建默认分区。但在我的情况下,由于数据文件非常大,我必须将这些数据重新分区为一个数字,以便这些数据可以以有效的方式处理。 如何选择要在重新分区中传递的分区数??我应该如何计