当前位置: 首页 > 知识库问答 >
问题:

Spark中的广播散列连接(BHJ)用于完整的外部连接(外部的、完整的、完整的)

咸正平
2023-03-14

如何强制spark中数据包的完全外部联接以使用Boradcast散列联接?下面是代码片段:

sparkConfiguration.set("spark.sql.autoBroadcastJoinThreshold", "1000000000")
val Result = BigTable.join(
  org.apache.spark.sql.functions.broadcast(SmallTable),
  Seq("X", "Y", "Z", "W", "V"),
  "outer"
)

但是,当我使用“outer”作为联接类型时,spark出于某种未知原因决定使用sortmergejoin。有人知道怎么解决这个问题吗?根据我在左外部联接中看到的性能,BroadcasThashjoin将有助于加快应用程序的速度。

共有1个答案

伍捷
2023-03-14

spark出于某种未知的原因决定使用SortMergeJoin。有人知道怎么解决这个问题吗?

原因:FullOuter(指任何关键字outerfullFullOuter)不支持广播散列连接(也就是map side连接)

如何证明这一点?

package com.examples

import org.apache.log4j.{Level, Logger}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

/**
  * Join Example and some basics demonstration using sample data.
  *
  * @author : Ram Ghadiyaram
  */
object JoinExamples extends Logging {
  // switch off  un necessary logs
  Logger.getLogger("org").setLevel(Level.OFF)
   val spark: SparkSession = SparkSession.builder.config("spark.master", "local").getOrCreate;
  case class Person(name: String, age: Int, personid: Int)

  case class Profile(name: String, personId: Int, profileDescription: String)

  /**
    * main
    *
    * @param args Array[String]
    */
  def main(args: Array[String]): Unit = {
    spark.conf.set("spark.sql.join.preferSortMergeJoin", "false")
    import spark.implicits._

    spark.sparkContext.getConf.getAllWithPrefix("spark.sql").foreach(x => logInfo(x.toString()))
    /**
      * create 2 dataframes here using case classes one is Person df1 and another one is profile df2
      */
    val df1 = spark.sqlContext.createDataFrame(
      spark.sparkContext.parallelize(
        Person("Sarath", 33, 2)
          :: Person("KangarooWest", 30, 2)
          :: Person("Ravikumar Ramasamy", 34, 5)
          :: Person("Ram Ghadiyaram", 42, 9)
          :: Person("Ravi chandra Kancharla", 43, 9)
          :: Nil))


    val df2 = spark.sqlContext.createDataFrame(
      Profile("Spark", 2, "SparkSQLMaster")
        :: Profile("Spark", 5, "SparkGuru")
        :: Profile("Spark", 9, "DevHunter")
        :: Nil
    )

    // you can do alias to refer column name with aliases to  increase readablity

    val df_asPerson = df1.as("dfperson")
    val df_asProfile = df2.as("dfprofile")
    /** *
      * Example displays how to join them in the dataframe level
      * next example demonstrates using sql with createOrReplaceTempView
      */
    val joined_df = df_asPerson.join(
      broadcast(df_asProfile)
      , col("dfperson.personid") === col("dfprofile.personid")
      , "outer")
    val joined = joined_df.select(
      col("dfperson.name")
      , col("dfperson.age")
      , col("dfprofile.name")
      , col("dfprofile.profileDescription"))
    joined.explain(false) // it will show which join was used
    joined.show

  }

}
== Physical Plan ==
*Project [name#4, age#5, name#11, profileDescription#13]
+- SortMergeJoin [personid#6], [personid#12], FullOuter
   :- *Sort [personid#6 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(personid#6, 200)
   :     +- *SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, com.examples.JoinExamples$Person, true]).name, true) AS name#4, assertnotnull(input[0, com.examples.JoinExamples$Person, true]).age AS age#5, assertnotnull(input[0, com.examples.JoinExamples$Person, true]).personid AS personid#6]
   :        +- Scan ExternalRDDScan[obj#3]
   +- *Sort [personid#12 ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(personid#12, 200)
         +- LocalTableScan [name#11, personId#12, profileDescription#13]
+--------------------+---+-----+------------------+
|                name|age| name|profileDescription|
+--------------------+---+-----+------------------+
|  Ravikumar Ramasamy| 34|Spark|         SparkGuru|
|      Ram Ghadiyaram| 42|Spark|         DevHunter|
|Ravi chandra Kanc...| 43|Spark|         DevHunter|
|              Sarath| 33|Spark|    SparkSQLMaster|
|        KangarooWest| 30|Spark|    SparkSQLMaster|
+--------------------+---+-----+------------------+
sparkSession.conf.set("spark.sql.join.preferSortMergeJoin", "false")

这是编写sparkstrategies.scala(负责将逻辑计划转换为零个或多个SparkPlans)的说明,您不想使用sortmergejoin

此属性spark.sql.join.preferSortMergeJoin如果为true,则更喜欢通过此PREFER_SORTMERGEJOIN属性进行排序合并联接,而不是shuffle hash联接。

设置false意味着spark不能只选择broadcasthashjoin,它也可以是其他任何东西(例如shuffle hash join)。

>

  • broadcast:如果连接的一侧的估计物理大小小于用户可配置的[[sqlconf.auto_broadcastjoin_threshold]]阈值,或者如果该一侧有显式的广播提示(例如,用户将[[org.apache.spark.sql.functions.broadcast()]]函数应用于dataframe),那么连接的这一侧将被广播,另一侧将被流式传输,不执行洗牌。如果连接的双方都有资格广播,则
  • 洗牌哈希联接:如果单个分区的平均大小足够小,可以构建哈希表。

    排序合并:如果匹配的联接键是可排序的。

  •  类似资料:
    • 我有一个db2查询,今天我意识到需要扩展该查询。 我的表使用联接已经相当复杂了,所以我并不想添加联合查询。我想做一个完整的外部连接。 我想是因为另一个左联接。

    • 问题内容: 如何使用django QuerySet API创建跨M2M关系芯片的完全外部联接的查询? 它不受支持,欢迎提供有关创建我自己的经理来执行此操作的提示。 编辑添加: @ S.Lott:感谢您的启发。应用程序需要使用OUTER JOIN。即使它仍然不完整,它也必须生成一个报告,显示输入的数据。我不知道结果将是一个新的类/模型。您的提示将对我有很大帮助。 问题答案: Django在通常的SQ

    • 如何创建合并来自两个不同表的不同所有列的视图。 这给了我一个错误: 重复的列名“tID” 有没有一种方法可以连接两个表,而不需要列出所有要选择的值?

    • 这是一个关于data.table连接语法的哲学问题。我发现越来越多的数据.表的用途,但仍在学习... data.tables的联接格式非常简洁、方便和高效,但据我所知,它只支持内部联接和右外部联接。要获得左或完全的外部联接,我需要使用: --Y中的所有行--右外部联接(默认) --仅限在X和Y中都匹配的行--内部联接 --来自X和Y的所有行--完全外部联接 --X中的所有行--左外部联接 在我看来

    • 问题内容: 我正在尝试编写一个join语句来将以下三个数据集连接在一起。(这是使用MS SQL Server的) 我认为完全可以通过外部联接来做到这一点,但是我遇到了主要的跨产品问题。 问题答案: 试试看:

    • 问题内容: 我的目标是在从链接读取href属性时始终获取相同的字符串(在我的情况下为URI)。举例:假设认为,一个HTML文件,它有一个像somany链接 ,但基本域是http://www.domainname.com/index.html 但基本域是http://www.domainname.com/dit/index.html 如何我可以正确获取所有链接意味着完整链接包括域名吗? 我如何在Ja