Spark数据集/数据帧连接NULL倾斜键

Mik*_*kov 5 skew apache-spark apache-spark-sql

使用 Spark Dataset/DataFrame 连接时,我面临长时间运行并因 OOM 作业而失败。

这是输入:

  • 约 10 个不同大小的数据集,大部分都很大(>1 TB)
  • 所有左连接到一个基础数据集
  • 一些连接键是null

经过一些分析,我发现失败和缓慢的作业原因是null偏斜键:当左侧有数百万条带有 join key 的记录时null

我采取了一些蛮力方法来解决这个问题,在这里我想分享一下。

如果您有更好的或任何内置的解决方案(对于常规 Apache Spark),请分享。

Hen*_*art 5

我不久前也遇到过同样的问题,但在进行一些性能测试后我选择了另一种方法。这取决于您的数据,数据会告诉您解决此连接问题的更好算法是什么。

就我而言,连接左侧有超过 30% 的数据为 null,并且数据采用 parquet 格式。鉴于此,我最好执行filter此键为空和此键不为空的情况,仅当不为空时才加入,然后合并两个数据。

val data = ...
val notJoinable = data.filter('keyToJoin.isNull)
val joinable = data.filter('keyToJoin.isNotNull)

joinable.join(...) union notJoinable
Run Code Online (Sandbox Code Playgroud)

它也避免了热点。如果我使用你的方法(负数/无论什么不是“可连接”值),spark将洗牌所有这些数据,这是大量数据(超过30%)。

只是想向您展示解决问题的另一种方法,


Mik*_*kov 3

这是我得出的解决方案:

  /**
    * Expression that produce negative random between -1 and -`lowestValue`(inclusively).
    *
    * @example
    *          {{{
    *             spark
    *                  .range(1, 100)
    *                  .withColumn("negative", negativeRandomWithin(3))
    *                  .select("negative")
    *                  .distinct()
    *                  .show(false)
    *          }}}
    *          +--------+
    *          |negative|
    *          +--------+
    *          |-2      |
    *          |-3      |
    *          |-1      |
    *          +--------+
    */
  private[transformation] def negativeRandomWithin(lowestValue: Long): Column = {
    negate(positiveRandomWithin(lowestValue)) - 1
  }

  /**
    * Expression that produce positive random between 0 and `highestValue`(exclusively).
    *
    * @example
    *          {{{
    *             spark
    *                  .range(1, 100)
    *                  .withColumn("positive", positiveRandomWithin(3))
    *                  .select("positive")
    *                  .distinct()
    *                  .show(false)
    *          }}}
    *          +--------+
    *          |positive|
    *          +--------+
    *          |0       |
    *          |1       |
    *          |2       |
    *          +--------+
    */
  private[transformation] def positiveRandomWithin(highestValue: Long) = {
    pmod((rand * highestValue).cast(LongType), lit(highestValue))
  }

  implicit class SkewedDataFrameExt(val underlying: DataFrame) extends AnyVal {

    /**
      * Particular optimized version of left outer join where left side of join has skewed `null` field.
      *
      * @note
      *       It works only for single column join which is applicable for `isNotNull`.
      *
      * Optimization algorithm:
      *   1. replace left dataset `null` values with negative number within range between -1 and - `nullNumBuckets`(10000 by default)
      *   2. use appended column, with original join column value and `null` replacements, as join column from left dataset
      *      appended column name builds using original left join column and `skewedColumnPostFix` separated by underscore.
      *
      * @note there is no checks how many `null` values on left dataset before applying above steps,
      *       as well as there is no checks does it sort merge join or broadcast.
      *
      * IMPORTANT: If left dataset already has appended column name, it will be reused to benefit already repartitioned data on the left
      *
      * HIGHLY IMPORTANT: right dataset should not contain negative values in `joinRightCol`
      */
    private[transformation] def nullSkewLeftJoin(right: DataFrame,
                                                 joinLeftCol: Column,
                                                 joinRightCol: Column,
                                                 skewedColumnPostFix: String = "skewed_column",
                                                 nullNumBuckets: Int = 10000): DataFrame = {

      val skewedTempColumn = s"${joinLeftCol.toString()}_$skewedColumnPostFix"

      if (underlying.columns.exists(_ equalsIgnoreCase skewedTempColumn)) {
        underlying.join(right.where(joinRightCol.isNotNull), col(skewedTempColumn) === joinRightCol, "left")
      } else {
        underlying
          .withColumn(skewedTempColumn,
                      when(joinLeftCol.isNotNull, joinLeftCol).otherwise(negativeRandomWithin(nullNumBuckets)))
          .join(right.where(joinRightCol.isNotNull), col(skewedTempColumn) === joinRightCol, "left")
      }
    }
  }
Run Code Online (Sandbox Code Playgroud)

简而言之:我用null负范围替换左数据集连接键值,以使其均匀地重新分区。

注意:此解决方案仅适用于左连接和null连接键倾斜。我不想分解正确的数据集并对任何键进行倾斜解决方案。另外,在该步骤之后,null连接键值将分布到不同的分区,因此mapPartitions等等将不起作用。

总之,上述解决方案对我有帮助,但我希望看到针对此类数据集连接问题的更多解决方案。