如何强制spark中数据包的完全外部联接以使用Boradcast散列联接?下面是代码片段:
sparkConfiguration.set("spark.sql.autoBroadcastJoinThreshold", "1000000000")
val Result = BigTable.join(
org.apache.spark.sql.functions.broadcast(SmallTable),
Seq("X", "Y", "Z", "W", "V"),
"outer"
)
但是,当我使用“outer
”作为联接类型时,spark出于某种未知原因决定使用sortmergejoin
。有人知道怎么解决这个问题吗?根据我在左外部联接中看到的性能,BroadcasThashjoin
将有助于加快应用程序的速度。
spark出于某种未知的原因决定使用SortMergeJoin。有人知道怎么解决这个问题吗?
原因:FullOuter(指任何关键字outer
、full
、FullOuter
)不支持广播散列连接(也就是map side连接)
如何证明这一点?
package com.examples import org.apache.log4j.{Level, Logger} import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions._ /** * Join Example and some basics demonstration using sample data. * * @author : Ram Ghadiyaram */ object JoinExamples extends Logging { // switch off un necessary logs Logger.getLogger("org").setLevel(Level.OFF) val spark: SparkSession = SparkSession.builder.config("spark.master", "local").getOrCreate; case class Person(name: String, age: Int, personid: Int) case class Profile(name: String, personId: Int, profileDescription: String) /** * main * * @param args Array[String] */ def main(args: Array[String]): Unit = { spark.conf.set("spark.sql.join.preferSortMergeJoin", "false") import spark.implicits._ spark.sparkContext.getConf.getAllWithPrefix("spark.sql").foreach(x => logInfo(x.toString())) /** * create 2 dataframes here using case classes one is Person df1 and another one is profile df2 */ val df1 = spark.sqlContext.createDataFrame( spark.sparkContext.parallelize( Person("Sarath", 33, 2) :: Person("KangarooWest", 30, 2) :: Person("Ravikumar Ramasamy", 34, 5) :: Person("Ram Ghadiyaram", 42, 9) :: Person("Ravi chandra Kancharla", 43, 9) :: Nil)) val df2 = spark.sqlContext.createDataFrame( Profile("Spark", 2, "SparkSQLMaster") :: Profile("Spark", 5, "SparkGuru") :: Profile("Spark", 9, "DevHunter") :: Nil ) // you can do alias to refer column name with aliases to increase readablity val df_asPerson = df1.as("dfperson") val df_asProfile = df2.as("dfprofile") /** * * Example displays how to join them in the dataframe level * next example demonstrates using sql with createOrReplaceTempView */ val joined_df = df_asPerson.join( broadcast(df_asProfile) , col("dfperson.personid") === col("dfprofile.personid") , "outer") val joined = joined_df.select( col("dfperson.name") , col("dfperson.age") , col("dfprofile.name") , col("dfprofile.profileDescription")) joined.explain(false) // it will show which join was used joined.show } }
== Physical Plan == *Project [name#4, age#5, name#11, profileDescription#13] +- SortMergeJoin [personid#6], [personid#12], FullOuter :- *Sort [personid#6 ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(personid#6, 200) : +- *SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, com.examples.JoinExamples$Person, true]).name, true) AS name#4, assertnotnull(input[0, com.examples.JoinExamples$Person, true]).age AS age#5, assertnotnull(input[0, com.examples.JoinExamples$Person, true]).personid AS personid#6] : +- Scan ExternalRDDScan[obj#3] +- *Sort [personid#12 ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(personid#12, 200) +- LocalTableScan [name#11, personId#12, profileDescription#13] +--------------------+---+-----+------------------+ | name|age| name|profileDescription| +--------------------+---+-----+------------------+ | Ravikumar Ramasamy| 34|Spark| SparkGuru| | Ram Ghadiyaram| 42|Spark| DevHunter| |Ravi chandra Kanc...| 43|Spark| DevHunter| | Sarath| 33|Spark| SparkSQLMaster| | KangarooWest| 30|Spark| SparkSQLMaster| +--------------------+---+-----+------------------+
sparkSession.conf.set("spark.sql.join.preferSortMergeJoin", "false")
这是编写sparkstrategies.scala
(负责将逻辑计划转换为零个或多个SparkPlans)的说明,您不想使用sortmergejoin
。
此属性spark.sql.join.preferSortMergeJoin
如果为true,则更喜欢通过此PREFER_SORTMERGEJOIN属性进行排序合并联接,而不是shuffle hash联接。
设置false
意味着spark不能只选择broadcasthashjoin,它也可以是其他任何东西(例如shuffle hash join)。
>
sqlconf.auto_broadcastjoin_threshold
]]阈值,或者如果该一侧有显式的广播提示(例如,用户将[[org.apache.spark.sql.functions.broadcast()
]]函数应用于dataframe
),那么连接的这一侧将被广播,另一侧将被流式传输,不执行洗牌。如果连接的双方都有资格广播,则洗牌哈希联接:如果单个分区的平均大小足够小,可以构建哈希表。
排序合并:如果匹配的联接键是可排序的。
我有一个db2查询,今天我意识到需要扩展该查询。 我的表使用联接已经相当复杂了,所以我并不想添加联合查询。我想做一个完整的外部连接。 我想是因为另一个左联接。
问题内容: 如何使用django QuerySet API创建跨M2M关系芯片的完全外部联接的查询? 它不受支持,欢迎提供有关创建我自己的经理来执行此操作的提示。 编辑添加: @ S.Lott:感谢您的启发。应用程序需要使用OUTER JOIN。即使它仍然不完整,它也必须生成一个报告,显示输入的数据。我不知道结果将是一个新的类/模型。您的提示将对我有很大帮助。 问题答案: Django在通常的SQ
如何创建合并来自两个不同表的不同所有列的视图。 这给了我一个错误: 重复的列名“tID” 有没有一种方法可以连接两个表,而不需要列出所有要选择的值?
这是一个关于data.table连接语法的哲学问题。我发现越来越多的数据.表的用途,但仍在学习... data.tables的联接格式非常简洁、方便和高效,但据我所知,它只支持内部联接和右外部联接。要获得左或完全的外部联接,我需要使用: --Y中的所有行--右外部联接(默认) --仅限在X和Y中都匹配的行--内部联接 --来自X和Y的所有行--完全外部联接 --X中的所有行--左外部联接 在我看来
问题内容: 我正在尝试编写一个join语句来将以下三个数据集连接在一起。(这是使用MS SQL Server的) 我认为完全可以通过外部联接来做到这一点,但是我遇到了主要的跨产品问题。 问题答案: 试试看:
问题内容: 我的目标是在从链接读取href属性时始终获取相同的字符串(在我的情况下为URI)。举例:假设认为,一个HTML文件,它有一个像somany链接 ,但基本域是http://www.domainname.com/index.html 但基本域是http://www.domainname.com/dit/index.html 如何我可以正确获取所有链接意味着完整链接包括域名吗? 我如何在Ja