Geo*_*hev 4 apache-spark pyspark
我们都知道,在SQL中,一般来说,我们在编写代码时都有一个定义好的词法操作的顺序:
SELECT ...
FROM ...
JOIN ...
WHERE ...
GROUP BY ...
HAVING ...
ORDER BY ...
Run Code Online (Sandbox Code Playgroud)
Spark 中如何体现这一点?我确实知道这全都与特定对象的属性有关,所以如果我能以不同的方式提出这个问题 - 对于来自 SQL 的人来说,在编写 Spark 应用程序时考虑操作的词汇顺序的有用方法是什么?
为了说明我的困惑。这是我测试中的两段代码,我在其中放置了orderBy
两个完全不同的位置(同样来自 SQL 背景),但代码产生了完全相同的结果:
tripDatawithDT \
.filter(tripData["Subscriber Type"] == "Subscriber")\
.orderBy(desc("End Date DT"))\
.groupBy("End Date DT")\
.count()\
.show()
tripDatawithDT \
.filter(tripData["Subscriber Type"] == "Subscriber")\
.groupBy("End Date DT")\
.count()\
.orderBy(desc("End Date DT"))\
.show()
Run Code Online (Sandbox Code Playgroud)
尽管如此,在其他情况下,由于操作的词法顺序错误,我完全搞乱了我的代码。
TL;DR只要您使用没有自定义优化器的标准开源构建Rules
,您就可以假设每个 DSL 操作都会引发一个逻辑子查询,并且所有逻辑优化都与 SQL:2003 标准一致。换句话说,您的 SQL 应该适用于此。
在内部,Spark 表示 SQL 查询的树LogicalPlans
,其中每个运算符对应于一个节点,其输入作为子节点。
因此,与 DSL 表达式相对应的未优化逻辑计划由每个运算符的嵌套节点组成(投影、选择、排序、带或不带分组的聚合)。所以给定表
from pyspark.sql.functions import col, desc
t0 = spark.createDataFrame(
[], "`End Date DT` timestamp, `Subscriber Type` string"
)
t0.createOrReplaceTempView("t0")
Run Code Online (Sandbox Code Playgroud)
第一个查询
(t0.alias("t0")
.filter(col("Subscriber Type") == "Subscriber").alias("t1")
.orderBy(desc("End Date DT")).alias("t2")
.groupBy("End Date DT")
.count())
Run Code Online (Sandbox Code Playgroud)
大致相当于*
SELECT `End Date DT`, COUNT(*) AS count FROM (
SELECT * FROM (
SELECT * FROM t0 WHERE `Subscriber Type` = 'Subscriber'
) as t1 ORDER BY `End Date DT` DESC
) as t2 GROUP BY `End Date DT`
Run Code Online (Sandbox Code Playgroud)
尽管
(t0.alias("t0")
.filter(col("Subscriber Type") == "Subscriber").alias("t1")
.groupBy("End Date DT")
.count().alias("t2")
.orderBy(desc("End Date DT")))
Run Code Online (Sandbox Code Playgroud)
大致相当于**
SELECT * FROM (
SELECT `End Date DT`, COUNT(*) AS count FROM (
SELECT * FROM t0 WHERE `Subscriber Type` = 'Subscriber'
) as t1 GROUP BY `End Date DT`
) as t2 ORDER BY `End Date DT` DESC
Run Code Online (Sandbox Code Playgroud)
显然,这两个查询并不等效,这反映在它们的优化执行计划中。
ORDER BY
之前GROUP BY
对应于
== Optimized Logical Plan ==
Aggregate [End Date DT#38], [End Date DT#38, count(1) AS count#70L]
+- Sort [End Date DT#38 DESC NULLS LAST], true
+- Project [End Date DT#38]
+- Filter (isnotnull(Subscriber Type#39) && (Subscriber Type#39 = Subscriber))
+- LogicalRDD [End Date DT#38, Subscriber Type#39], false
Run Code Online (Sandbox Code Playgroud)
而ORDER BY
之后GROUP BY
对应于
== Optimized Logical Plan ==
Sort [End Date DT#38 DESC NULLS LAST], true
+- Aggregate [End Date DT#38], [End Date DT#38, count(1) AS count#84L]
+- Project [End Date DT#38]
+- Filter (isnotnull(Subscriber Type#39) && (Subscriber Type#39 = Subscriber))
+- LogicalRDD [End Date DT#38, Subscriber Type#39], false
Run Code Online (Sandbox Code Playgroud)
那么为什么这些可以给出相同的最终结果呢?这是因为在像这里这样的基本情况下,查询规划器会将前面视为ORDER BY
应用范围分区的提示,而不是哈希分区。ORDER BY
因此,接下来的物理计划GROUP BY
将是
== Physical Plan ==
*(2) HashAggregate(keys=[End Date DT#38], functions=[count(1)])
+- *(2) HashAggregate(keys=[End Date DT#38], functions=[partial_count(1)])
+- *(2) Sort [End Date DT#38 DESC NULLS LAST], true, 0
+- Exchange rangepartitioning(End Date DT#38 DESC NULLS LAST, 200)
+- *(1) Project [End Date DT#38]
+- *(1) Filter (isnotnull(Subscriber Type#39) && (Subscriber Type#39 = Subscriber))
+- Scan ExistingRDD[End Date DT#38,Subscriber Type#39]
Run Code Online (Sandbox Code Playgroud)
如果没有ORDER BY
*** 则默认为哈希分区
== Physical Plan ==
*(2) HashAggregate(keys=[End Date DT#38], functions=[count(1)])
+- Exchange hashpartitioning(End Date DT#38, 200)
+- *(1) HashAggregate(keys=[End Date DT#38], functions=[partial_count(1)])
+- *(1) Project [End Date DT#38]
+- *(1) Filter (isnotnull(Subscriber Type#39) && (Subscriber Type#39 = Subscriber))
+- Scan ExistingRDD[End Date DT#38,Subscriber Type#39]
Run Code Online (Sandbox Code Playgroud)
因为这发生在规划阶段,这是高影响力的扩展点(特别是对于数据源提供者),所以我认为这是实现的细节,并且不依赖于这种行为的正确性。
* 具有针对 DSL 变体的解析逻辑计划
== Parsed Logical Plan ==
'Aggregate ['End Date DT], [unresolvedalias('End Date DT, None), count(1) AS count#45L]
+- SubqueryAlias `t2`
+- Sort [End Date DT#38 DESC NULLS LAST], true
+- SubqueryAlias `t1`
+- Filter (Subscriber Type#39 = Subscriber)
+- SubqueryAlias `t0`
+- LogicalRDD [End Date DT#38, Subscriber Type#39], false
Run Code Online (Sandbox Code Playgroud)
对于 SQL 变体
== Parsed Logical Plan ==
'Aggregate ['End Date DT], ['End Date DT, 'COUNT(1) AS count#50]
+- 'SubqueryAlias `t2`
+- 'Sort ['End Date DT DESC NULLS LAST], true
+- 'Project [*]
+- 'SubqueryAlias `t1`
+- 'Project [*]
+- 'Filter ('Subscriber Type = Subscriber)
+- 'UnresolvedRelation `t0`
Run Code Online (Sandbox Code Playgroud)
** 具有针对 DSL 变体的解析逻辑计划
== Parsed Logical Plan ==
'Sort ['End Date DT DESC NULLS LAST], true
+- SubqueryAlias `t2`
+- Aggregate [End Date DT#38], [End Date DT#38, count(1) AS count#59L]
+- SubqueryAlias `t1`
+- Filter (Subscriber Type#39 = Subscriber)
+- SubqueryAlias `t0`
+- LogicalRDD [End Date DT#38, Subscriber Type#39], false
Run Code Online (Sandbox Code Playgroud)
对于 SQL 变体
== Parsed Logical Plan ==
'Sort ['End Date DT DESC NULLS LAST], true
+- 'Project [*]
+- 'SubqueryAlias `t2`
+- 'Aggregate ['End Date DT], ['End Date DT, 'COUNT(1) AS count#64]
+- 'SubqueryAlias `t1`
+- 'Project [*]
+- 'Filter ('Subscriber Type = Subscriber)
+- 'UnresolvedRelation `t0`
Run Code Online (Sandbox Code Playgroud)
*** IE
== Optimized Logical Plan ==
Aggregate [End Date DT#38], [End Date DT#38, count(1) AS count#70L]
+- Sort [End Date DT#38 DESC NULLS LAST], true
+- Project [End Date DT#38]
+- Filter (isnotnull(Subscriber Type#39) && (Subscriber Type#39 = Subscriber))
+- LogicalRDD [End Date DT#38, Subscriber Type#39], false
Run Code Online (Sandbox Code Playgroud)