der*_*rek 7 apache-spark apache-spark-sql
我有两个数据帧A和B.A很大(100 G),B相对较小(100 M).A的分区号是8,B的分区号是1.
A.join(broadcast(B), $"cur" >= $"low" && $"cur" <= $"high", "left_outer")
Run Code Online (Sandbox Code Playgroud)
速度很慢(> 10小时).
但是,如果我将连接条件更改为:
A.join(broadcast(B), $"cur" === $"low" , "left_outer")
Run Code Online (Sandbox Code Playgroud)
它变得非常快(<30分钟).但条件不能改变.
那么有什么方法可以进一步提高我原来的连接条件下的连接速度?
use*_*411 13
诀窍是重写join条件,因此它包含=可用于优化查询和缩小可能匹配的组件.对于数值,您可以对数据进行bucketize并使用存储桶进行连接.
假设您的数据如下所示:
val a = spark.range(100000)
.withColumn("cur", (rand(1) * 1000).cast("bigint"))
val b = spark.range(100)
.withColumn("low", (rand(42) * 1000).cast("bigint"))
.withColumn("high", ($"low" + rand(-42) * 10).cast("bigint"))
Run Code Online (Sandbox Code Playgroud)
首先选择适合您数据的铲斗尺寸.在这种情况下,我们可以使用50:
val bucketSize = 50L
Run Code Online (Sandbox Code Playgroud)
为以下各行分配存储桶a:
val aBucketed = a.withColumn(
"bucket", ($"cur" / bucketSize).cast("bigint") * bucketSize
)
Run Code Online (Sandbox Code Playgroud)
创建将为范围发出存储桶的UDF:
def get_buckets(bucketSize: Long) =
udf((low: Long, high: Long) => {
val min = (low / bucketSize) * bucketSize
val max = (high / bucketSize) * bucketSize
(min to max by bucketSize).toSeq
})
Run Code Online (Sandbox Code Playgroud)
和桶b:
val bBucketed = b.withColumn(
"bucket", explode(get_buckets(bucketSize)($"low", $"high"))
)
Run Code Online (Sandbox Code Playgroud)
在join条件下使用桶:
aBucketed.join(
broadcast(bBucketed),
aBucketed("bucket") === bBucketed("bucket") &&
$"cur" >= $"low" &&
$"cur" <= $"high",
"leftouter"
)
Run Code Online (Sandbox Code Playgroud)
这样Spark就会使用BroadcastHashJoin:
*BroadcastHashJoin [bucket#184L], [bucket#178L], LeftOuter, BuildRight, ((cur#98L >= low#105L) && (cur#98L <= high#109L))
:- *Project [id#95L, cur#98L, (cast((cast(cur#98L as double) / 50.0) as bigint) * 50) AS bucket#184L]
: +- *Project [id#95L, cast((rand(1) * 1000.0) as bigint) AS cur#98L]
: +- *Range (0, 100000, step=1, splits=Some(8))
+- BroadcastExchange HashedRelationBroadcastMode(List(input[3, bigint, false]))
+- Generate explode(if ((isnull(low#105L) || isnull(high#109L))) null else UDF(low#105L, high#109L)), true, false, [bucket#178L]
+- *Project [id#102L, low#105L, cast((cast(low#105L as double) + (rand(-42) * 10.0)) as bigint) AS high#109L]
+- *Project [id#102L, cast((rand(42) * 1000.0) as bigint) AS low#105L]
+- *Range (0, 100, step=1, splits=Some(8))
Run Code Online (Sandbox Code Playgroud)
而不是BroadcastNestedLoopJoin:
== Physical Plan ==
BroadcastNestedLoopJoin BuildRight, LeftOuter, ((cur#98L >= low#105L) && (cur#98L <= high#109L))
:- *Project [id#95L, cast((rand(1) * 1000.0) as bigint) AS cur#98L]
: +- *Range (0, 100000, step=1, splits=Some(8))
+- BroadcastExchange IdentityBroadcastMode
+- *Project [id#102L, low#105L, cast((cast(low#105L as double) + (rand(-42) * 10.0)) as bigint) AS high#109L]
+- *Project [id#102L, cast((rand(42) * 1000.0) as bigint) AS low#105L]
+- *Range (0, 100, step=1, splits=Some(8))
Run Code Online (Sandbox Code Playgroud)
您可以调整存储桶大小以在精度和数据大小之间取得平衡.
如果您不介意较低级别的解决方案,则broadcast使用具有常量项目访问权限的排序序列(如Array或Vector),并使用udf二进制搜索进行连接.
您还应该查看分区数量.100GB的8个分区似乎相当低.
另见:
| 归档时间: |
|
| 查看次数: |
1699 次 |
| 最近记录: |