我正在尝试将一个100万行数据帧与一个30行数据帧进行内部连接,这两个表都有相同的连接键,spark正在尝试执行排序合并连接,因此我的所有数据最终都在同一个执行器中,例如,Job从未完成
DF1(million rows dataframe registered as TempView DF1)
+-------+-----------+
| id | price |
+-------+-----------+
| 1 | 30 |
| 1 | 10 |
| 1 | 12 |
| 1 | 15 |
+-------+-----------+
DF2(30 rows dataframe registered as TempView DF2)
+-------+-----------+
| id | Month |
+-------+-----------+
| 1 | Jan |
| 1 | Feb |
+-------+-----------+
我试着跟随
广播
spark.sql("Select /*+ BROADCAST(Df2) */ Df1.* from Df1 inner join Df2 on Df1.id=Df2.id").createTempView("temp")
已重新分区
Df1.repartition(200)
查询执行计划
00 Project [.......................]
01 +- SortMergeJoin [.............................],Inner
02 :- Project [.............................]
03 : +-Filter is notnull[JoinKey]
04 : +- FileScan orc[..........................]
05 +-Project [.............................]
06 +-BroadcastHashJoin [..........................], LeftOuter, BuildRight
07 :- BroadCastHashJoin [......................],LeftSemi, BuildRight
分区数的输出
spark.table("temp").withColumn("partition_id",spark_partition_id).groupBy
("partition_id").count
+-------+---------------+
| 21 |300,00,000 |
+-------+---------------+
即使我重新分区/广播数据,火花在加入时将所有数据带到一个执行器,数据在一个执行器上会发生倾斜。我还尝试将sortMergeJoinspark.sql.join.prefer关闭为false。但是我仍然看到我的数据在一个执行器上发生倾斜。有人能帮我吗?
只是这样做,它工作正常。数据按原样,没有分区。
import org.apache.spark.sql.functions.broadcast
// Simulate some data
val df1 = spark.range(1000000).rdd.map(x => (1, "xxx")).toDF("one", "val")
val df2 = spark.range(30).rdd.map(x => (1, "yyy")).toDF("one", "val2")
// Data is as is, has no partitioning applied
val df3 = df1.join(broadcast(df2), "one")
df3.count // An action to kick it all along
// Look at final counts of partitions
val rddcounts = df3.rdd.mapPartitions(iter => Array(iter.size).iterator, true)
rddcounts.collect
返回:
res26: Array[Int] = Array(3750000, 3750000, 3750000, 3750000, 3750000, 3750000, 3750000, 3750000)
这依赖于CE Databricks集群上的默认并行度8。
广播应该工作在任何情况下,因为小表是小。
即使这样:
val df = spark.range(1000000).rdd.map(x => (1, "xxx")).toDF("one", "val")
val df1 = df.repartition(50)
它与 50 个分区并行工作。这是轮循机制分区,这意味着群集将获得分布在至少具有 N 个执行程序的 N 个工作线程上的分区。它不是散列的,如果所有值都相同,则通过指定导致偏度的列来调用哈希。即 1 个 Worker 上所有数据的相同分区。
QED:所以,并不是所有人都只在一个执行器上工作,除非您只有一个用于Spark应用程序或哈希的执行器。
之后,我在我的实验性笔记本电脑上运行了local[4],数据由4个内核提供服务,因此可以说是4个执行器。无盐渍,平行4。所以,奇怪的是你无法得到它,除非你散列。
您可以看到4个并行任务,因此,如果在一个真实的集群上,并不是所有的任务都在一个执行器上。
我想问一下我在火花工作中遇到的数据偏斜问题。我知道如果你有数据偏差,最佳实践技巧之一是做盐渍技术。在我的 Spark 作业中,我必须在两个数据帧之间执行联接(其中一个数据帧的大小约为 5 GB)。 此外,我通常用一个数字和我用来连接的字段重新分区,以便在连接之前尽可能多地控制分区。因此,我使用shuffle将连接转换为具有窄依赖关系的映射连接。 场景是: 我有24个执行器,每个执行器有4个核心 我
我建立了一个数据库连接。然后,我尝试建立连接,如果出现异常,则返回。在代码的下面,我有一个Finally块,我打算用它作为一个catch all来关闭连接。 然而,如果初始连接失败,它将跳转到我的con所在的最后一个块。close()引发空指针异常。最好的办法是什么?有没有办法测试con是否为null?我试过if(con.isValid(0))和con。等于(null)和con==null,它们都
问题内容: 我正在使用EF 6.1.0 我在下面将自定义DBContex对象作为DBEntites 我对上下文对象执行以下操作 但是在放置上下文对象之后,我仍然可以看到一个活动的数据库连接。在连接状态下,我可以看到该连接已经关闭(该连接从未为真)。 我正在使用以下查询来查看SQL上的连接。 在下面的语句中,增加了sql连接计数。但是即使处置了它也从未失败过。(我的意思是在使用块计算后,它应该关闭连
我正在研究建立一个JDBC Spark连接,以便从r/Python使用。我知道和都是可用的,但它们似乎更适合交互式分析,特别是因为它们为用户保留了集群资源。我在考虑一些更类似于Tableau ODBC Spark connection的东西--一些更轻量级的东西(据我所知),用于支持简单的随机访问。虽然这似乎是可能的,而且有一些文档,但(对我来说)JDBC驱动程序的需求是什么并不清楚。 既然Hiv
我正在处理UDF中的空值,该UDF在数据帧(源自配置单元表)上运行,该数据帧由浮点数结构组成: 数据帧()具有以下架构: 例如,我想计算x和y的总和。请注意,我不会在以下示例中“处理”空值,但我希望能够在我的udf中检查、或是否。 第一种方法: 如果<code>struct是否为空,因为在scala中<code>浮点不能为空。 第二种方法: 这种方法,我可以在我的udf中检查是否为空,但我可以检查