当前位置: 首页 > 知识库问答 >
问题:

Spark数据集/数据帧连接空倾斜键

白宏义
2023-03-14

使用Spark Dataset/DataFrame联接时,我面临长时间运行且OOM作业失败的问题。

以下是输入:

  • ~10个不同大小的数据集,大部分是巨大的(

经过一番分析,我发现作业失败和缓慢的原因是null歪斜键:当左侧有数百万条记录时,用连接键null

我用了一些蛮力的方法来解决这个问题,这里我想和大家分享一下。

如果您有更好的或任何内置的解决方案(针对常规Apache Spark),请与他人分享。

共有2个答案

祖波光
2023-03-14

以下是我想到的解决方案:

  /**
    * Expression that produce negative random between -1 and -`lowestValue`(inclusively).
    *
    * @example
    *          {{{
    *             spark
    *                  .range(1, 100)
    *                  .withColumn("negative", negativeRandomWithin(3))
    *                  .select("negative")
    *                  .distinct()
    *                  .show(false)
    *          }}}
    *          +--------+
    *          |negative|
    *          +--------+
    *          |-2      |
    *          |-3      |
    *          |-1      |
    *          +--------+
    */
  private[transformation] def negativeRandomWithin(lowestValue: Long): Column = {
    negate(positiveRandomWithin(lowestValue)) - 1
  }

  /**
    * Expression that produce positive random between 0 and `highestValue`(exclusively).
    *
    * @example
    *          {{{
    *             spark
    *                  .range(1, 100)
    *                  .withColumn("positive", positiveRandomWithin(3))
    *                  .select("positive")
    *                  .distinct()
    *                  .show(false)
    *          }}}
    *          +--------+
    *          |positive|
    *          +--------+
    *          |0       |
    *          |1       |
    *          |2       |
    *          +--------+
    */
  private[transformation] def positiveRandomWithin(highestValue: Long) = {
    pmod((rand * highestValue).cast(LongType), lit(highestValue))
  }

  implicit class SkewedDataFrameExt(val underlying: DataFrame) extends AnyVal {

    /**
      * Particular optimized version of left outer join where left side of join has skewed `null` field.
      *
      * @note
      *       It works only for single column join which is applicable for `isNotNull`.
      *
      * Optimization algorithm:
      *   1. replace left dataset `null` values with negative number within range between -1 and - `nullNumBuckets`(10000 by default)
      *   2. use appended column, with original join column value and `null` replacements, as join column from left dataset
      *      appended column name builds using original left join column and `skewedColumnPostFix` separated by underscore.
      *
      * @note there is no checks how many `null` values on left dataset before applying above steps,
      *       as well as there is no checks does it sort merge join or broadcast.
      *
      * IMPORTANT: If left dataset already has appended column name, it will be reused to benefit already repartitioned data on the left
      *
      * HIGHLY IMPORTANT: right dataset should not contain negative values in `joinRightCol`
      */
    private[transformation] def nullSkewLeftJoin(right: DataFrame,
                                                 joinLeftCol: Column,
                                                 joinRightCol: Column,
                                                 skewedColumnPostFix: String = "skewed_column",
                                                 nullNumBuckets: Int = 10000): DataFrame = {

      val skewedTempColumn = s"${joinLeftCol.toString()}_$skewedColumnPostFix"

      if (underlying.columns.exists(_ equalsIgnoreCase skewedTempColumn)) {
        underlying.join(right.where(joinRightCol.isNotNull), col(skewedTempColumn) === joinRightCol, "left")
      } else {
        underlying
          .withColumn(skewedTempColumn,
                      when(joinLeftCol.isNotNull, joinLeftCol).otherwise(negativeRandomWithin(nullNumBuckets)))
          .join(right.where(joinRightCol.isNotNull), col(skewedTempColumn) === joinRightCol, "left")
      }
    }
  }

简而言之:我将左数据集连接键null值替换为负范围,以使其均匀地重新分区。

注意:此解决方案仅适用于左连接和nulljoin键倾斜。我不想爆炸正确的数据集,并做任何关键的倾斜解决方案。此外,在该步骤之后,nulljoin键值将被分配到不同的分区,因此,mapPartitions等将不起作用。

作为总结,上述解决方案对我有所帮助,但我希望看到更多针对此类数据集连接问题的解决方案。

尚宏硕
2023-03-14

我之前也遇到过同样的问题,但在做了一些性能测试之后,我选择了另一种方法。这取决于您的数据,数据将告诉您解决此连接问题的更好算法。

在我的例子中,我有超过30%的数据在join的左侧带有null,并且数据是拼花格式的。考虑到这一点,我最好执行一个过滤器,其中该键为null,并且该键不为null,只在不为null时连接,然后合并这两个数据。

val data = ...
val notJoinable = data.filter('keyToJoin.isNull)
val joinable = data.filter('keyToJoin.isNotNull)

joinable.join(...) union notJoinable

它也避免热点。如果我使用你的方法(负数/任何不是可连接的值),火花会洗牌所有这些数据,这是很多数据(超过30%)。

只是想给你展示解决问题的另一种方法,

 类似资料:
  • 我需要根据一些共享的键列将许多数据帧连接在一起。对于键值RDD,可以指定一个分区程序,以便具有相同键的数据点被洗牌到相同的执行器,因此连接更有效(如果在之前有与洗牌相关的操作)。可以在火花数据帧或数据集上做同样的事情吗?

  • 我正在尝试使用Spark数据集API,但在进行简单连接时遇到了一些问题。 假设我有两个带有字段的数据集:,那么在的情况下,我的连接如下所示: 但是,对于数据集,有一个。joinWith方法,但相同的方法不起作用:

  • null null 为什么要使用UDF/UADF而不是map(假设map保留在数据集表示中)?

  • 我正在使用: Python 3.6.8 火花2.4.4 我在spark类路径中有以下JAR: http://repo1.maven.org/maven2/com/databricks/spark-redshift_2.11/2.0.1/spark-redshift_2.11-2.0.1.jar http://repo1.maven.org/maven2/com/databricks/spark-a

  • 添加地标        其使用原理同6.1.1 模型平移        在LSV中可以对模型进行平移操作,一般用于将目标模型进行位置调整,或者将其高程进行调整(有些模型加载出来可能贴于地标所以无法显示出来,可通过调整高程使其显示)。        先选择自己所要平移模型的图层之后点击选择模型所平移的参考点的起点与终点,并且可对效果进行预览:        之后同样的在选择所需要平移模型的图层后可对

  • 打开数据        在“倾斜摄影”菜单栏中点击“打开数据”,找到本地倾斜摄影索引(lfp)文件存放位置,点击打开osgb转换后的lfp格式倾斜摄影数据文件(具体转换步骤见“倾斜摄影”菜单栏中的“数据转换”),该lfp文件包含三维模型所在的经度、纬度、高度值,便于倾斜摄影三维模型在地球上进行定位。支持倾斜摄影三维模型格式为smart3d生成的osgb格式。        打开后数据效果如下图。可