unv*_*dim 4 apache-spark pyspark
给定具有 2 个核心的单工作 Spark 集群上的 PySpark 代码:
df = spark.table('table')
df = df.dropDuplicates(['checksum'])
#
df = df.cache()
...
df.write.save('...)
Run Code Online (Sandbox Code Playgroud)
当缓存存在时,它生成并执行一个包含 200 个分区的计划;当缓存df.cache()不存在时,它只生成并执行 2 个分区。
我特别想知道缓存对这种情况下的规划的影响。
类似的影响df.cache似乎对 AQE(自适应查询执行)有影响,如果在昂贵的洗牌后缓存 DataFrame,则合并洗牌后分区似乎不会发生。
这是一个非常好的问题!
我深入研究,我可以确认默认情况下带有 AQE 的 Spark 的行为就像您所描述的那样。AQE 未优化缓存的关系。
此票证中解决了这个问题:https ://issues.apache.org/jira/browse/SPARK-35332
这张票的结果是这已更改,现在我们可以将spark.sql.optimizer.canChangeCachedPlanOutputPartitioning设置为true以允许AQE在缓存期间更改分区
我用 Spark 3.2 对 Databricks 集群进行了测试
示例代码:
import org.apache.spark.sql.functions._
spark.conf.set("spark.sql.optimizer.canChangeCachedPlanOutputPartitioning", false)
val data = Seq(("1", "Frankfurt am main", "Germany"),("1", "Frankfurt am main", "Germany"))
val df = data.toDF("Id", "City", "Country")
val uniqueRecords = df.dropDuplicates("City").cache()
uniqueRecords.show()
Run Code Online (Sandbox Code Playgroud)
使用默认设置我有这个阶段:
物理计划:
== Physical Plan ==
CollectLimit (9)
+- InMemoryTableScan (1)
+- InMemoryRelation (2)
+- SortAggregate (8)
+- Sort (7)
+- Exchange (6)
+- SortAggregate (5)
+- * Sort (4)
+- * LocalTableScan (3)
Run Code Online (Sandbox Code Playgroud)
正如你所看到的,根本没有 AQE
现在让我们尝试一下
spark.conf.set("spark.sql.optimizer.canChangeCachedPlanOutputPartitioning", true)
Run Code Online (Sandbox Code Playgroud)
阶段:
和物理计划:
== Physical Plan ==
CollectLimit (10)
+- InMemoryTableScan (1)
+- InMemoryRelation (2)
+- AdaptiveSparkPlan (9)
+- SortAggregate (8)
+- Sort (7)
+- Exchange (6)
+- SortAggregate (5)
+- Sort (4)
+- LocalTableScan (3)
Run Code Online (Sandbox Code Playgroud)
所以我可以确认,当你在 Spark 3.2 及更高版本上更改此参数时,它会起作用。AQE 在平面图中可见,分区已合并
| 归档时间: |
|
| 查看次数: |
799 次 |
| 最近记录: |