当前位置: 首页 > 知识库问答 >
问题:

为什么在重新划分Spark数据帧时会有这么多空分区?

倪棋
2023-03-14

我想将数据帧“df1”划分为3列。此数据帧正好有990个针对这3列的唯一组合:

In [17]: df1.createOrReplaceTempView("df1_view")

In [18]: spark.sql("select count(*) from (select distinct(col1,col2,col3) from df1_view) as t").show()
+--------+                                                                      
|count(1)|
+--------+
|     990|
+--------+

为了优化这个数据帧的处理,我想对df1进行分区,以获得990个分区,每个分区对应一个密钥:

In [19]: df1.rdd.getNumPartitions()
Out[19]: 24

In [20]: df2 = df1.repartition(990, "col1", "col2", "col3")

In [21]: df2.rdd.getNumPartitions()
Out[21]: 990

我写了一个简单的方法来计算每个分区中的行数:

In [22]: def f(iterator):
    ...:     a = 0
    ...:     for partition in iterator:
    ...:         a = a + 1
    ...:     print(a)
    ...: 

In [23]: df2.foreachPartition(f)

我注意到,实际上我得到的是628个带有一个或多个键值的分区,以及362个空分区。

我假设spark会以一种均匀的方式(1个键值=1个分区)重新分区,但这似乎不是这样,我觉得这种重新分区增加了数据倾斜,尽管它应该是另一种方式。。。

Spark用于在列上划分数据帧的算法是什么?有没有办法实现我认为可能实现的目标?

我在Cloudera上使用Spark 2.2.0。

共有1个答案

子车灿
2023-03-14

要跨分区分发数据,Spark需要以某种方式将列的值转换为分区的索引。Spark中有两个缺省分区-Hash分区器和Range分区器。Spark中的不同转换可以应用不同的分区-例如连接将应用哈希分区。

基本上,对于散列分区器来说,将值转换为分区索引的公式应该是value。hashCode()%numOfPartitions。在您的情况下,多个值映射到同一个分区索引。

如果想要更好的分发,可以实现自己的分区器。更多关于它的信息就在这里,这里,这里。

 类似资料:
  • 我有一个包含100个分区的df,在保存到HDFS之前,我想减少分区的数量,因为拼花文件太小了( 它可以工作,但将过程从每个文件 2-3 秒减慢到每个文件 10-20 秒。当我尝试重新分区时: 这个过程一点也不慢,每个文件2-3秒。 为什么?在减少分区数量时,合并不应该总是更快,因为它避免了完全洗牌吗? 背景: 我将文件从本地存储导入spark集群,并将生成的数据帧保存为拼花文件。每个文件大约100

  • 所以问题是在主题中。我认为我没有正确理解重新分区的工作。在我的脑海中,当我说时,我希望所有数据都将在工作人员(假设60个工作人员)之间按相等的大小进行分区。 举个例子。我会在不平衡的文件中加载大量数据,比如400个文件,其中20%的文件大小为2Gb,其他80%的文件大小约为1Mb。我有加载此数据的代码: 然后,我希望将原始数据转换为中间对象,过滤不相关的记录,转换为最终对象(带有附加属性),然后按

  • 我在这里浏览了文档:https://spark . Apache . org/docs/latest/API/python/py spark . SQL . html 它说: 重新分区:生成的DataFrame是哈希分区的 对于repartitionByRange:结果DataFrame是范围分区的 而且之前的一个问题也提到了。然而,我仍然不明白它们到底有什么不同,当选择一个而不是另一个时会有什么

  • 我是scala/sark世界的新手,最近开始了一项任务,它读取一些数据,处理数据并将其保存在S3上。我阅读了一些关于stackoverflow的主题/问题,这些主题/问题涉及重分区/合并性能和最佳分区数(如本例)。假设我有正确的分区数,我的问题是,在将rdd转换为数据帧时,对它进行重新分区是个好主意吗?下面是我的代码目前的样子: 这是我打算做的(过滤后重新分区数据): 我的问题是,这样做是个好主意

  • 根据这么多好的资源,建议在过滤操作后重新划分一个RDD。因为,有可能大多数分区现在都是空的。我有一个疑问,在数据帧的情况下,这在当前版本中已经被处理了吗,或者我们仍然需要在过滤操作之后重新划分它吗?

  • 我需要使用 spark-sql 加载一个 Hive 表,然后对其运行一些机器学习算法。我是这样写的: 它工作得很好,但如果我想增加数据集数据帧的分区数,我该怎么做?使用普通RDD,我可以写: 我想要有N个分区。 谢谢