我有一张小桌子(2k)的记录和一张大桌子(5 mil)的记录。我需要从小表中获取所有数据,只从大表中获取匹配数据,因此我在下面执行了查询select/*broadcast(small)*/small。*虽然查询返回正确的结果,但当我检查查询计划时,它显示排序合并的广播哈希连接。如果小桌子是我们不能广播的桌子,有什么限制吗。
在通过广播左表进行左连接时,更改表的顺序,以便广播右表(或)将连接类型更改为
right
。
select /*+ broadcast(small)*/ small.* From small right outer join large
select /*+ broadcast(small)*/ small.* From large left outer join small
示例:
df=spark.createDataFrame([(1,'a')],['id','name'])
df1=spark.createDataFrame([(1,'a')],['id','name'])
#broadcasting on right df1 and performing left join
df.join(broadcast(df1),['id'],'left').explain()
#== Physical Plan ==
#*(2) Project [id#0L, name#1, name#5]
#+- *(2) BroadcastHashJoin [id#0L], [id#4L], LeftOuter, BuildRight
# :- Scan ExistingRDD[id#0L,name#1]
# +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))
# +- *(1) Filter isnotnull(id#4L)
# +- Scan ExistingRDD[id#4L,name#5]
#broadcasting df1 and right join defaults to Sortmerge join
df.join(broadcast(df1),['id'],'right').explain()
#== Physical Plan ==
#*(4) Project [id#4L, name#1, name#5]
#+- SortMergeJoin [id#0L], [id#4L], RightOuter
# :- *(2) Sort [id#0L ASC NULLS FIRST], false, 0
# : +- Exchange hashpartitioning(id#0L, 200)
# : +- *(1) Filter isnotnull(id#0L)
# : +- Scan ExistingRDD[id#0L,name#1]
# +- *(3) Sort [id#4L ASC NULLS FIRST], false, 0
# +- Exchange hashpartitioning(id#4L, 200)
# +- Scan ExistingRDD[id#4L,name#5]
由于要从小表而不是大表中选择完整的数据集,因此 Spark 不会强制实施广播联接。当您更改加入顺序或转换为等联接时,Spark 会很乐意强制执行广播联接。
例如:
原因:*Spark将向存在大表数据的所有数据节点共享小表,也称为广播表。在您的情况下,我们需要小表中的所有数据,但只需要大表中的匹配数据。所以spark不知道这个记录是否在另一个数据节点匹配,甚至根本不匹配。由于这种模糊性,它无法从小表中选择所有记录(如果是分布式的)。所以spark在本例中没有使用广播连接*
我在aws s3和emr上使用Spark 2.4进行项目,我有一个左连接,有两个巨大的数据部分。火花执行不稳定,它经常因内存问题而失败。 集群有10台m3.2xlarge类型的机器,每台机器有16个vCore、30 GiB内存、160个SSD GB存储。 我有这样的配置: 左侧连接发生在 150GB 的左侧和大约 30GB 的右侧之间,因此有很多随机播放。我的解决方案是将右侧切得足够小,例如 1G
我正在尝试优化我的火花应用工作。 我试图理解这个问题的要点:如何在唯一键上连接数据帧时避免混乱? > 我已经确保必须发生加入操作的键分布在同一分区中(使用我的自定义分区程序)。 我也不能做广播加入,因为我的数据可能会根据情况变大。 在上面提到的问题的答案中,重新分区只优化了连接,但我需要的是无需切换即可连接。在分区内的键的帮助下,我对连接操作很满意。 有可能吗?如果不存在类似的功能,我想实现像jo
我使用spark-core 2.0.1版和Scala2.11。我有一个简单的代码来读取一个包含\escapes的csv文件。 null 有人面临同样的问题吗?我是不是漏掉了什么? 谢谢
我使用的是datastax提供的spark-cassandra-connector 1.1.0。我注意到了interining问题,我不知道为什么会发生这样的事情:当我广播cassandra connector并试图在执行程序上使用它时,我重复了异常,这表明我的配置无效,无法在0.0.0连接到cassandra。 示例StackTrace:
SMSlistener
我想在spark中读取一个CSV,将其转换为DataFrame,并使用将其存储在HDFS中 在Apache Spark中将CSV文件加载为DataFrame的正确命令是什么?