为什么每当使用缓存(持久)时 Spark 查询计划都会显示更多分区

unv*_*dim 4 apache-spark pyspark

给定具有 2 个核心的单工作 Spark 集群上的 PySpark 代码:

df = spark.table('table')

df = df.dropDuplicates(['checksum'])

#
df = df.cache()

...

df.write.save('...)

Run Code Online (Sandbox Code Playgroud)

当缓存存在时,它生成并执行一个包含 200 个分区的计划;当缓存df.cache()不存在时,它只生成并执行 2 个分区。

我特别想知道缓存对这种情况下的规划的影响。

带缓存: 使用 df.cache

没有缓存: 没有缓存

类似的影响df.cache似乎对 AQE(自适应查询执行)有影响,如果在昂贵的洗牌后缓存 DataFrame,则合并洗牌后分区似乎不会发生。

M_S*_*M_S 5

这是一个非常好的问题!

我深入研究,我可以确认默认情况下带有 AQE 的 Spark 的行为就像您所描述的那样。AQE 未优化缓存的关系。

此票证中解决了这个问题:https ://issues.apache.org/jira/browse/SPARK-35332

这张票的结果是这已更改,现在我们可以将spark.sql.optimizer.canChangeCachedPlanOutputPartitioning设置为true以允许AQE在缓存期间更改分区

我用 Spark 3.2 对 Databricks 集群进行了测试

示例代码:

import org.apache.spark.sql.functions._

spark.conf.set("spark.sql.optimizer.canChangeCachedPlanOutputPartitioning", false)

val data = Seq(("1", "Frankfurt am main", "Germany"),("1", "Frankfurt am main", "Germany"))
val df = data.toDF("Id", "City", "Country")
val uniqueRecords = df.dropDuplicates("City").cache()
uniqueRecords.show()
Run Code Online (Sandbox Code Playgroud)

使用默认设置我有这个阶段:

在此输入图像描述

物理计划:

== Physical Plan ==
CollectLimit (9)
+- InMemoryTableScan (1)
      +- InMemoryRelation (2)
            +- SortAggregate (8)
               +- Sort (7)
                  +- Exchange (6)
                     +- SortAggregate (5)
                        +- * Sort (4)
                           +- * LocalTableScan (3)
Run Code Online (Sandbox Code Playgroud)

正如你所看到的,根本没有 AQE

现在让我们尝试一下

spark.conf.set("spark.sql.optimizer.canChangeCachedPlanOutputPartitioning", true)
Run Code Online (Sandbox Code Playgroud)

阶段:

在此输入图像描述

和物理计划:

== Physical Plan ==
CollectLimit (10)
+- InMemoryTableScan (1)
      +- InMemoryRelation (2)
            +- AdaptiveSparkPlan (9)
               +- SortAggregate (8)
                  +- Sort (7)
                     +- Exchange (6)
                        +- SortAggregate (5)
                           +- Sort (4)
                              +- LocalTableScan (3)
Run Code Online (Sandbox Code Playgroud)

所以我可以确认,当你在 Spark 3.2 及更高版本上更改此参数时,它会起作用。AQE 在平面图中可见,分区已合并