当前位置: 首页 > 知识库问答 >
问题:

处理dataskew而不加盐的连接键在火花

乜华翰
2023-03-14

我正在尝试将一个100万行数据帧与一个30行数据帧进行内部连接,这两个表都有相同的连接键,spark正在尝试执行排序合并连接,因此我的所有数据最终都在同一个执行器中,例如,Job从未完成

DF1(million rows dataframe registered as TempView DF1)
+-------+-----------+
|   id  |  price    | 
+-------+-----------+
|    1  |   30      |
|    1  |   10      |
|    1  |   12      |
|    1  |   15      |
+-------+-----------+
DF2(30 rows dataframe registered as TempView DF2)
+-------+-----------+
|   id  |  Month    | 
+-------+-----------+
|    1  |   Jan     |
|    1  |   Feb     |
+-------+-----------+

我试着跟随

广播

spark.sql("Select /*+ BROADCAST(Df2) */ Df1.* from Df1 inner join Df2 on Df1.id=Df2.id").createTempView("temp")

已重新分区

Df1.repartition(200)

查询执行计划

00 Project [.......................]
01 +- SortMergeJoin [.............................],Inner
02    :- Project [.............................]
03    :  +-Filter is notnull[JoinKey]
04    :    +- FileScan orc[..........................]
05    +-Project [.............................]
06      +-BroadcastHashJoin [..........................], LeftOuter, BuildRight
07        :- BroadCastHashJoin [......................],LeftSemi, BuildRight

分区数的输出

spark.table("temp").withColumn("partition_id",spark_partition_id).groupBy
("partition_id").count
+-------+---------------+
|    21 |300,00,000     |
+-------+---------------+

即使我重新分区/广播数据,火花在加入时将所有数据带到一个执行器,数据在一个执行器上会发生倾斜。我还尝试将sortMergeJoinspark.sql.join.prefer关闭为false。但是我仍然看到我的数据在一个执行器上发生倾斜。有人能帮我吗?

共有2个答案

陆宏扬
2023-03-14
  1. 为什么所有数据都要移动到一个执行器?如果您在DF1中只有相同的id(id:1),并使用id连接DF2。根据HashPartitioner,id为1的数据将始终一起移动。
  2. 你有广播加入吗?在spark UI中检查
哈栋
2023-03-14

只是这样做,它工作正常。数据按原样,没有分区。

 import org.apache.spark.sql.functions.broadcast
 // Simulate some data
 val df1 = spark.range(1000000).rdd.map(x => (1, "xxx")).toDF("one", "val")
 val df2 = spark.range(30).rdd.map(x => (1, "yyy")).toDF("one", "val2")
 // Data is as is, has no partitioning applied

 val df3 = df1.join(broadcast(df2), "one")  
 df3.count // An action to kick it all along

 // Look at final counts of partitions
 val rddcounts = df3.rdd.mapPartitions(iter => Array(iter.size).iterator, true) 
 rddcounts.collect

返回

res26: Array[Int] = Array(3750000, 3750000, 3750000, 3750000, 3750000, 3750000, 3750000, 3750000)

这依赖于CE Databricks集群上的默认并行度8。

广播应该工作在任何情况下,因为小表是小。

即使这样:

val df = spark.range(1000000).rdd.map(x => (1, "xxx")).toDF("one", "val")
val df1 = df.repartition(50)

它与 50 个分区并行工作。这是轮循机制分区,这意味着群集将获得分布在至少具有 N 个执行程序的 N 个工作线程上的分区。它不是散列的,如果所有值都相同,则通过指定导致偏度的列来调用哈希。即 1 个 Worker 上所有数据的相同分区。

QED:所以,并不是所有人都只在一个执行器上工作,除非您只有一个用于Spark应用程序或哈希的执行器。

之后,我在我的实验性笔记本电脑上运行了local[4],数据由4个内核提供服务,因此可以说是4个执行器。无盐渍,平行4。所以,奇怪的是你无法得到它,除非你散列。

您可以看到4个并行任务,因此,如果在一个真实的集群上,并不是所有的任务都在一个执行器上。

 类似资料:
  • 我想问一下我在火花工作中遇到的数据偏斜问题。我知道如果你有数据偏差,最佳实践技巧之一是做盐渍技术。在我的 Spark 作业中,我必须在两个数据帧之间执行联接(其中一个数据帧的大小约为 5 GB)。 此外,我通常用一个数字和我用来连接的字段重新分区,以便在连接之前尽可能多地控制分区。因此,我使用shuffle将连接转换为具有窄依赖关系的映射连接。 场景是: 我有24个执行器,每个执行器有4个核心 我

  • 我建立了一个数据库连接。然后,我尝试建立连接,如果出现异常,则返回。在代码的下面,我有一个Finally块,我打算用它作为一个catch all来关闭连接。 然而,如果初始连接失败,它将跳转到我的con所在的最后一个块。close()引发空指针异常。最好的办法是什么?有没有办法测试con是否为null?我试过if(con.isValid(0))和con。等于(null)和con==null,它们都

  • 问题内容: 我正在使用EF 6.1.0 我在下面将自定义DBContex对象作为DBEntites 我对上下文对象执行以下操作 但是在放置上下文对象之后,我仍然可以看到一个活动的数据库连接。在连接状态下,我可以看到该连接已经关闭(该连接从未为真)。 我正在使用以下查询来查看SQL上的连接。 在下面的语句中,增加了sql连接计数。但是即使处置了它也从未失败过。(我的意思是在使用块计算后,它应该关闭连

  • 我正在研究建立一个JDBC Spark连接,以便从r/Python使用。我知道和都是可用的,但它们似乎更适合交互式分析,特别是因为它们为用户保留了集群资源。我在考虑一些更类似于Tableau ODBC Spark connection的东西--一些更轻量级的东西(据我所知),用于支持简单的随机访问。虽然这似乎是可能的,而且有一些文档,但(对我来说)JDBC驱动程序的需求是什么并不清楚。 既然Hiv

  • 我正在处理UDF中的空值,该UDF在数据帧(源自配置单元表)上运行,该数据帧由浮点数结构组成: 数据帧()具有以下架构: 例如,我想计算x和y的总和。请注意,我不会在以下示例中“处理”空值,但我希望能够在我的udf中检查、或是否。 第一种方法: 如果<code>struct是否为空,因为在scala中<code>浮点不能为空。 第二种方法: 这种方法,我可以在我的udf中检查是否为空,但我可以检查