为了确保两个数据集的共置和共分区,它们必须在同一作业中由相同的键和分区数进行分区。
如果我加入这些数据集,结果加入的数据集会保留这个分区吗?
如果我然后在同一作业中使用相同的键和分区数对第三个数据集进行分区,这是否保证与加入的数据集的共同分区/托管?
我的理解是肯定的,Spark 有几个优化可以避免不必要的洗牌。让我们考虑示例:
lazy val spark: SparkSession =
SparkSession
.builder()
.appName(getClass.getSimpleName)
.master("local[2]")
.config("spark.sql.shuffle.partitions", "5")
.getOrCreate()
spark.sql("SET spark.sql.autoBroadcastJoinThreshold=-1")
import spark.implicits._
val df1 = spark.range(1, 100)
val df2 = spark.range(1, 200)
val df3 = spark.range(1, 300)
df1.join(df2, df1("id") === df2("id")).join(df3, df1("id") === df3("id")).explain(true)
Run Code Online (Sandbox Code Playgroud)
以及它的物理计划:
== Physical Plan ==
*SortMergeJoin [id#5L], [id#11L], Inner
:- *SortMergeJoin [id#5L], [id#8L], Inner
: :- *Sort [id#5L ASC NULLS FIRST], false, 0
: : +- Exchange hashpartitioning(id#5L, 5)
: : +- *Range (1, 100, step=1, splits=2)
: +- *Sort [id#8L ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(id#8L, 5)
: +- *Range (1, 200, step=1, splits=2)
+- *Sort [id#11L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(id#11L, 5)
+- *Range (1, 300, step=1, splits=2)
Run Code Online (Sandbox Code Playgroud)
正如您所看到的,每个数据仅重新分区一次,df1并且df2连接的结果不会在第二次重新分区。
这是借用分区数的默认行为 .config("spark.sql.shuffle.partitions", "5")
但是,如果你试图重新分区df3之内repartition这可能是有趣:
1.尝试按ID重新分区,分区数少于spark.sql.shuffle.partitions使用的分区数df1.join(df2...
val df3 = spark.range(1, 300).repartition(3, col("id"))
df1.join(df2, df1("id") === df2("id")).join(df3, df1("id") === df3("id")).explain(true)
Run Code Online (Sandbox Code Playgroud)
以及它的物理计划:
== Physical Plan ==
*SortMergeJoin [id#5L], [id#11L], Inner
:- *SortMergeJoin [id#5L], [id#8L], Inner
: :- *Sort [id#5L ASC NULLS FIRST], false, 0
: : +- Exchange hashpartitioning(id#5L, 5)
: : +- *Range (1, 100, step=1, splits=2)
: +- *Sort [id#8L ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(id#8L, 5)
: +- *Range (1, 200, step=1, splits=2)
+- *Sort [id#11L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(id#11L, 5)
+- *Range (1, 300, step=1, splits=2)
Run Code Online (Sandbox Code Playgroud)
正如您所看到的 - 同样的图片,Spark 优雅地忽略了repartition(3, col("id")).
df3具有更多分区的重新分区:
val df3 = spark.range(1, 300).repartition(10, col("id"))
df1.join(df2, df1("id") === df2("id")).join(df3, df1("id") === df3("id")).explain(true)
以及它的物理计划:
== Physical Plan ==
*SortMergeJoin [id#5L], [id#11L], Inner
:- *Sort [id#5L ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(id#5L, 10)
: +- *SortMergeJoin [id#5L], [id#8L], Inner
: :- *Sort [id#5L ASC NULLS FIRST], false, 0
: : +- Exchange hashpartitioning(id#5L, 5)
: : +- *Range (1, 100, step=1, splits=2)
: +- *Sort [id#8L ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(id#8L, 5)
: +- *Range (1, 200, step=1, splits=2)
+- *Sort [id#11L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(id#11L, 10)
+- *Range (1, 300, step=1, splits=2)
Run Code Online (Sandbox Code Playgroud)
您可能会发现,在加入和 的结果上发生了一个额外的重新分区。df1df2
注意:手动重新分区需要partitionExprs在调用repartition函数时使用相同的,否则会出现不必要的shuffle。
总而言之,Spark 在任何时候都有很好的优化,当你涉及自定义代码时要小心,至少使用explain.
希望能帮助到你!
| 归档时间: |
|
| 查看次数: |
2152 次 |
| 最近记录: |