如何提高广播加入速度与Spark之间的条件

der*_*rek 7 apache-spark apache-spark-sql

我有两个数据帧A和B.A很大(100 G),B相对较小(100 M).A的分区号是8,B的分区号是1.

A.join(broadcast(B), $"cur" >= $"low" &&  $"cur" <= $"high", "left_outer")
Run Code Online (Sandbox Code Playgroud)

速度很慢(> 10小时).

但是,如果我将连接条件更改为:

A.join(broadcast(B), $"cur" === $"low" , "left_outer")
Run Code Online (Sandbox Code Playgroud)

它变得非常快(<30分钟).但条件不能改变.

那么有什么方法可以进一步提高我原来的连接条件下的连接速度?

use*_*411 13

诀窍是重写join条件,因此它包含=可用于优化查询和缩小可能匹配的组件.对于数值,您可以对数据进行bucketize并使用存储桶进行连接.

假设您的数据如下所示:

val a = spark.range(100000)
  .withColumn("cur", (rand(1) * 1000).cast("bigint"))

val b = spark.range(100)
  .withColumn("low", (rand(42) * 1000).cast("bigint"))
  .withColumn("high", ($"low" + rand(-42) * 10).cast("bigint"))
Run Code Online (Sandbox Code Playgroud)

首先选择适合您数据的铲斗尺寸.在这种情况下,我们可以使用50:

val bucketSize = 50L
Run Code Online (Sandbox Code Playgroud)

为以下各行分配存储桶a:

val aBucketed = a.withColumn(
  "bucket", ($"cur" / bucketSize).cast("bigint") * bucketSize
)
Run Code Online (Sandbox Code Playgroud)

创建将为范围发出存储桶的UDF:

def get_buckets(bucketSize: Long) = 
  udf((low: Long, high: Long) => {
    val min = (low / bucketSize) * bucketSize
    val max = (high / bucketSize) * bucketSize
    (min to max by bucketSize).toSeq
  })
Run Code Online (Sandbox Code Playgroud)

和桶b:

val bBucketed = b.withColumn(
  "bucket", explode(get_buckets(bucketSize)($"low",  $"high"))
)
Run Code Online (Sandbox Code Playgroud)

join条件下使用桶:

aBucketed.join(
  broadcast(bBucketed), 
  aBucketed("bucket") === bBucketed("bucket") && 
    $"cur" >= $"low" &&  
    $"cur" <= $"high",
  "leftouter"
)
Run Code Online (Sandbox Code Playgroud)

这样Spark就会使用BroadcastHashJoin:

*BroadcastHashJoin [bucket#184L], [bucket#178L], LeftOuter, BuildRight, ((cur#98L >= low#105L) && (cur#98L <= high#109L))
:- *Project [id#95L, cur#98L, (cast((cast(cur#98L as double) / 50.0) as bigint) * 50) AS bucket#184L]
:  +- *Project [id#95L, cast((rand(1) * 1000.0) as bigint) AS cur#98L]
:     +- *Range (0, 100000, step=1, splits=Some(8))
+- BroadcastExchange HashedRelationBroadcastMode(List(input[3, bigint, false]))
   +- Generate explode(if ((isnull(low#105L) || isnull(high#109L))) null else UDF(low#105L, high#109L)), true, false, [bucket#178L]
      +- *Project [id#102L, low#105L, cast((cast(low#105L as double) + (rand(-42) * 10.0)) as bigint) AS high#109L]
         +- *Project [id#102L, cast((rand(42) * 1000.0) as bigint) AS low#105L]
            +- *Range (0, 100, step=1, splits=Some(8))
Run Code Online (Sandbox Code Playgroud)

而不是BroadcastNestedLoopJoin:

== Physical Plan ==
BroadcastNestedLoopJoin BuildRight, LeftOuter, ((cur#98L >= low#105L) && (cur#98L <= high#109L))
:- *Project [id#95L, cast((rand(1) * 1000.0) as bigint) AS cur#98L]
:  +- *Range (0, 100000, step=1, splits=Some(8))
+- BroadcastExchange IdentityBroadcastMode
   +- *Project [id#102L, low#105L, cast((cast(low#105L as double) + (rand(-42) * 10.0)) as bigint) AS high#109L]
      +- *Project [id#102L, cast((rand(42) * 1000.0) as bigint) AS low#105L]
         +- *Range (0, 100, step=1, splits=Some(8))
Run Code Online (Sandbox Code Playgroud)

您可以调整存储桶大小以在精度和数据大小之间取得平衡.

如果您不介意较低级别的解决方案,则broadcast使用具有常量项目访问权限的排序序列(如ArrayVector),并使用udf二进制搜索进行连接.

您还应该查看分区数量.100GB的8个分区似乎相当低.

另见: