我想将数据帧“df1”划分为3列。此数据帧正好有990个针对这3列的唯一组合:
In [17]: df1.createOrReplaceTempView("df1_view")
In [18]: spark.sql("select count(*) from (select distinct(col1,col2,col3) from df1_view) as t").show()
+--------+
|count(1)|
+--------+
| 990|
+--------+
为了优化这个数据帧的处理,我想对df1进行分区,以获得990个分区,每个分区对应一个密钥:
In [19]: df1.rdd.getNumPartitions()
Out[19]: 24
In [20]: df2 = df1.repartition(990, "col1", "col2", "col3")
In [21]: df2.rdd.getNumPartitions()
Out[21]: 990
我写了一个简单的方法来计算每个分区中的行数:
In [22]: def f(iterator):
...: a = 0
...: for partition in iterator:
...: a = a + 1
...: print(a)
...:
In [23]: df2.foreachPartition(f)
我注意到,实际上我得到的是628个带有一个或多个键值的分区,以及362个空分区。
我假设spark会以一种均匀的方式(1个键值=1个分区)重新分区,但这似乎不是这样,我觉得这种重新分区增加了数据倾斜,尽管它应该是另一种方式。。。
Spark用于在列上划分数据帧的算法是什么?有没有办法实现我认为可能实现的目标?
我在Cloudera上使用Spark 2.2.0。
要跨分区分发数据,Spark需要以某种方式将列的值转换为分区的索引。Spark中有两个缺省分区-Hash分区器和Range分区器。Spark中的不同转换可以应用不同的分区-例如连接
将应用哈希分区。
基本上,对于散列分区器来说,将值转换为分区索引的公式应该是value。hashCode()%numOfPartitions
。在您的情况下,多个值映射到同一个分区索引。
如果想要更好的分发,可以实现自己的分区器。更多关于它的信息就在这里,这里,这里。
我有一个包含100个分区的df,在保存到HDFS之前,我想减少分区的数量,因为拼花文件太小了( 它可以工作,但将过程从每个文件 2-3 秒减慢到每个文件 10-20 秒。当我尝试重新分区时: 这个过程一点也不慢,每个文件2-3秒。 为什么?在减少分区数量时,合并不应该总是更快,因为它避免了完全洗牌吗? 背景: 我将文件从本地存储导入spark集群,并将生成的数据帧保存为拼花文件。每个文件大约100
所以问题是在主题中。我认为我没有正确理解重新分区的工作。在我的脑海中,当我说时,我希望所有数据都将在工作人员(假设60个工作人员)之间按相等的大小进行分区。 举个例子。我会在不平衡的文件中加载大量数据,比如400个文件,其中20%的文件大小为2Gb,其他80%的文件大小约为1Mb。我有加载此数据的代码: 然后,我希望将原始数据转换为中间对象,过滤不相关的记录,转换为最终对象(带有附加属性),然后按
我在这里浏览了文档:https://spark . Apache . org/docs/latest/API/python/py spark . SQL . html 它说: 重新分区:生成的DataFrame是哈希分区的 对于repartitionByRange:结果DataFrame是范围分区的 而且之前的一个问题也提到了。然而,我仍然不明白它们到底有什么不同,当选择一个而不是另一个时会有什么
我是scala/sark世界的新手,最近开始了一项任务,它读取一些数据,处理数据并将其保存在S3上。我阅读了一些关于stackoverflow的主题/问题,这些主题/问题涉及重分区/合并性能和最佳分区数(如本例)。假设我有正确的分区数,我的问题是,在将rdd转换为数据帧时,对它进行重新分区是个好主意吗?下面是我的代码目前的样子: 这是我打算做的(过滤后重新分区数据): 我的问题是,这样做是个好主意
根据这么多好的资源,建议在过滤操作后重新划分一个RDD。因为,有可能大多数分区现在都是空的。我有一个疑问,在数据帧的情况下,这在当前版本中已经被处理了吗,或者我们仍然需要在过滤操作之后重新划分它吗?
我需要使用 spark-sql 加载一个 Hive 表,然后对其运行一些机器学习算法。我是这样写的: 它工作得很好,但如果我想增加数据集数据帧的分区数,我该怎么做?使用普通RDD,我可以写: 我想要有N个分区。 谢谢