Mik*_*kov 5 skew apache-spark apache-spark-sql
使用 Spark Dataset/DataFrame 连接时,我面临长时间运行并因 OOM 作业而失败。
这是输入:
null经过一些分析,我发现失败和缓慢的作业原因是null偏斜键:当左侧有数百万条带有 join key 的记录时null。
我采取了一些蛮力方法来解决这个问题,在这里我想分享一下。
如果您有更好的或任何内置的解决方案(对于常规 Apache Spark),请分享。
我不久前也遇到过同样的问题,但在进行一些性能测试后我选择了另一种方法。这取决于您的数据,数据会告诉您解决此连接问题的更好算法是什么。
就我而言,连接左侧有超过 30% 的数据为 null,并且数据采用 parquet 格式。鉴于此,我最好执行filter此键为空和此键不为空的情况,仅当不为空时才加入,然后合并两个数据。
val data = ...
val notJoinable = data.filter('keyToJoin.isNull)
val joinable = data.filter('keyToJoin.isNotNull)
joinable.join(...) union notJoinable
Run Code Online (Sandbox Code Playgroud)
它也避免了热点。如果我使用你的方法(负数/无论什么不是“可连接”值),spark将洗牌所有这些数据,这是大量数据(超过30%)。
只是想向您展示解决问题的另一种方法,
这是我得出的解决方案:
/**
* Expression that produce negative random between -1 and -`lowestValue`(inclusively).
*
* @example
* {{{
* spark
* .range(1, 100)
* .withColumn("negative", negativeRandomWithin(3))
* .select("negative")
* .distinct()
* .show(false)
* }}}
* +--------+
* |negative|
* +--------+
* |-2 |
* |-3 |
* |-1 |
* +--------+
*/
private[transformation] def negativeRandomWithin(lowestValue: Long): Column = {
negate(positiveRandomWithin(lowestValue)) - 1
}
/**
* Expression that produce positive random between 0 and `highestValue`(exclusively).
*
* @example
* {{{
* spark
* .range(1, 100)
* .withColumn("positive", positiveRandomWithin(3))
* .select("positive")
* .distinct()
* .show(false)
* }}}
* +--------+
* |positive|
* +--------+
* |0 |
* |1 |
* |2 |
* +--------+
*/
private[transformation] def positiveRandomWithin(highestValue: Long) = {
pmod((rand * highestValue).cast(LongType), lit(highestValue))
}
implicit class SkewedDataFrameExt(val underlying: DataFrame) extends AnyVal {
/**
* Particular optimized version of left outer join where left side of join has skewed `null` field.
*
* @note
* It works only for single column join which is applicable for `isNotNull`.
*
* Optimization algorithm:
* 1. replace left dataset `null` values with negative number within range between -1 and - `nullNumBuckets`(10000 by default)
* 2. use appended column, with original join column value and `null` replacements, as join column from left dataset
* appended column name builds using original left join column and `skewedColumnPostFix` separated by underscore.
*
* @note there is no checks how many `null` values on left dataset before applying above steps,
* as well as there is no checks does it sort merge join or broadcast.
*
* IMPORTANT: If left dataset already has appended column name, it will be reused to benefit already repartitioned data on the left
*
* HIGHLY IMPORTANT: right dataset should not contain negative values in `joinRightCol`
*/
private[transformation] def nullSkewLeftJoin(right: DataFrame,
joinLeftCol: Column,
joinRightCol: Column,
skewedColumnPostFix: String = "skewed_column",
nullNumBuckets: Int = 10000): DataFrame = {
val skewedTempColumn = s"${joinLeftCol.toString()}_$skewedColumnPostFix"
if (underlying.columns.exists(_ equalsIgnoreCase skewedTempColumn)) {
underlying.join(right.where(joinRightCol.isNotNull), col(skewedTempColumn) === joinRightCol, "left")
} else {
underlying
.withColumn(skewedTempColumn,
when(joinLeftCol.isNotNull, joinLeftCol).otherwise(negativeRandomWithin(nullNumBuckets)))
.join(right.where(joinRightCol.isNotNull), col(skewedTempColumn) === joinRightCol, "left")
}
}
}
Run Code Online (Sandbox Code Playgroud)
简而言之:我用null负范围替换左数据集连接键值,以使其均匀地重新分区。
注意:此解决方案仅适用于左连接和null连接键倾斜。我不想分解正确的数据集并对任何键进行倾斜解决方案。另外,在该步骤之后,null连接键值将分布到不同的分区,因此mapPartitions等等将不起作用。
总之,上述解决方案对我有帮助,但我希望看到针对此类数据集连接问题的更多解决方案。
| 归档时间: |
|
| 查看次数: |
3347 次 |
| 最近记录: |