Spark Window 聚合与 Group By/Join 性能

use*_*130 5 apache-spark apache-spark-sql

与 group by/join 相比,我对在窗口上运行聚合函数的性能特征感兴趣。在这种情况下,我对具有自定义框架边界或排序的窗口函数不感兴趣,而只是作为运行聚合函数的一种方式。

请注意,我只对大小合适的数据量的批处理(非流)性能感兴趣,因此我已禁用以下广播连接。

例如,假设我们从以下 DataFrame 开始:

val df = Seq(("bob", 10), ("sally", 32), ("mike", 9), ("bob", 18)).toDF("name", "age")
df.show(false)

+-----+---+
|name |age|
+-----+---+
|bob  |10 |
|sally|32 |
|mike |9  |
|bob  |18 |
+-----+---+

Run Code Online (Sandbox Code Playgroud)

假设我们要计算每个名称出现的次数,然后在具有匹配名称的行上提供该计数。

分组依据/加入

val joinResult = df.join(
    df.groupBy($"name").count,
    Seq("name"),
    "inner"
)
joinResult.show(false)

+-----+---+-----+
|name |age|count|
+-----+---+-----+
|sally|32 |1    |
|mike |9  |1    |
|bob  |18 |2    |
|bob  |10 |2    |
+-----+---+-----+

joinResult.explain
== Physical Plan ==
*(4) Project [name#5, age#6, count#12L]
+- *(4) SortMergeJoin [name#5], [name#15], Inner
   :- *(1) Sort [name#5 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(name#5, 200)
   :     +- LocalTableScan [name#5, age#6]
   +- *(3) Sort [name#15 ASC NULLS FIRST], false, 0
      +- *(3) HashAggregate(keys=[name#15], functions=[count(1)])
         +- Exchange hashpartitioning(name#15, 200)
            +- *(2) HashAggregate(keys=[name#15], functions=[partial_count(1)])
               +- LocalTableScan [name#15]

Run Code Online (Sandbox Code Playgroud)

窗户

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.{functions => f}

val windowResult = df.withColumn("count", f.count($"*").over(Window.partitionBy($"name")))
windowResult.show(false)

+-----+---+-----+
|name |age|count|
+-----+---+-----+
|sally|32 |1    |
|mike |9  |1    |
|bob  |10 |2    |
|bob  |18 |2    |
+-----+---+-----+

windowResult.explain
== Physical Plan ==
Window [count(1) windowspecdefinition(name#5, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS count#34L], [name#5]
+- *(1) Sort [name#5 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(name#5, 200)
      +- LocalTableScan [name#5, age#6]

Run Code Online (Sandbox Code Playgroud)

根据执行计划,Windowing 看起来更高效(更少的阶段)。所以我的问题是是否总是这样——我应该总是使用 Window 函数进行这种聚合吗?随着数据的增长,这两种方法是否会类似地扩展?极端偏斜怎么办(即某些名称比其他名称更常见)?

Dav*_*rba 8

这取决于数据。更具体地说,它取决于name列的基数。如果基数小,聚合后数据就会小,聚合结果可以在join中广播。在这种情况下,连接将比window. 另一方面,如果基数大,聚合后的数据也大,那么join会用 来规划SortMergeJoin,使用window效率会更高。

window我们有 1 次总洗牌 + 一种排序的情况下。在SortMergeJoin我们在左分支(总混洗 + 排序)加上额外的减少混洗和右侧分支排序的情况下(减少我的意思是首先聚合数据)。在连接的右侧分支中,我们还对数据进行了额外扫描。

此外,您可以查看我分析类似示例的 Spark Summit 中的视频