Spark 词法运算顺序

Geo*_*hev 4 apache-spark pyspark

我们都知道,在SQL中,一般来说,我们在编写代码时都有一个定义好的词法操作的顺序:

SELECT ...
FROM ...
JOIN ...
WHERE ...
GROUP BY ...
HAVING ...
ORDER BY ...
Run Code Online (Sandbox Code Playgroud)

Spark 中如何体现这一点?我确实知道这全都与特定对象的属性有关,所以如果我能以不同的方式提出这个问题 - 对于来自 SQL 的人来说,在编写 Spark 应用程序时考虑操作的词汇顺序的有用方法是什么?

为了说明我的困惑。这是我测试中的两段代码,我在其中放置了orderBy两个完全不同的位置(同样来自 SQL 背景),但代码产生了完全相同的结果:

tripDatawithDT \
.filter(tripData["Subscriber Type"] == "Subscriber")\
.orderBy(desc("End Date DT"))\
.groupBy("End Date DT")\
.count()\
.show()


tripDatawithDT \
.filter(tripData["Subscriber Type"] == "Subscriber")\
.groupBy("End Date DT")\
.count()\
.orderBy(desc("End Date DT"))\
.show()
Run Code Online (Sandbox Code Playgroud)

尽管如此,在其他情况下,由于操作的词法顺序错误,我完全搞乱了我的代码。

104*_*ica 5

TL;DR只要您使用没有自定义优化器的标准开源构建Rules,您就可以假设每个 DSL 操作都会引发一个逻辑子查询,并且所有逻辑优化都与 SQL:2003 标准一致。换句话说,您的 SQL 应该适用于此。

在内部,Spark 表示 SQL 查询的树LogicalPlans,其中每个运算符对应于一个节点,其输入作为子节点。

因此,与 DSL 表达式相对应的未优化逻辑计划由每个运算符的嵌套节点组成(投影、选择、排序、带或不带分组的聚合)。所以给定表

from pyspark.sql.functions import col, desc

t0 = spark.createDataFrame(
    [], "`End Date DT` timestamp, `Subscriber Type` string"
)
t0.createOrReplaceTempView("t0")
Run Code Online (Sandbox Code Playgroud)

第一个查询

(t0.alias("t0")
    .filter(col("Subscriber Type") == "Subscriber").alias("t1")
    .orderBy(desc("End Date DT")).alias("t2")
    .groupBy("End Date DT")
    .count())
Run Code Online (Sandbox Code Playgroud)

大致相当于*

SELECT `End Date DT`, COUNT(*)  AS count FROM (
    SELECT * FROM (
        SELECT * FROM t0 WHERE `Subscriber Type` = 'Subscriber'
    ) as t1 ORDER BY `End Date DT` DESC
) as t2 GROUP BY `End Date DT`
Run Code Online (Sandbox Code Playgroud)

尽管

(t0.alias("t0")
    .filter(col("Subscriber Type") == "Subscriber").alias("t1")
    .groupBy("End Date DT")
    .count().alias("t2")
    .orderBy(desc("End Date DT")))
Run Code Online (Sandbox Code Playgroud)

大致相当于**

SELECT * FROM (
    SELECT `End Date DT`, COUNT(*) AS count FROM (
        SELECT * FROM t0 WHERE `Subscriber Type` = 'Subscriber'
    ) as t1 GROUP BY `End Date DT`
) as t2 ORDER BY `End Date DT` DESC
Run Code Online (Sandbox Code Playgroud)

显然,这两个查询并不等效,这反映在它们的优化执行计划中。

ORDER BY之前GROUP BY对应于

== Optimized Logical Plan ==
Aggregate [End Date DT#38], [End Date DT#38, count(1) AS count#70L]
+- Sort [End Date DT#38 DESC NULLS LAST], true
   +- Project [End Date DT#38]
      +- Filter (isnotnull(Subscriber Type#39) && (Subscriber Type#39 = Subscriber))
         +- LogicalRDD [End Date DT#38, Subscriber Type#39], false
Run Code Online (Sandbox Code Playgroud)

ORDER BY之后GROUP BY对应于

== Optimized Logical Plan ==
Sort [End Date DT#38 DESC NULLS LAST], true
+- Aggregate [End Date DT#38], [End Date DT#38, count(1) AS count#84L]
   +- Project [End Date DT#38]
      +- Filter (isnotnull(Subscriber Type#39) && (Subscriber Type#39 = Subscriber))
         +- LogicalRDD [End Date DT#38, Subscriber Type#39], false
Run Code Online (Sandbox Code Playgroud)

那么为什么这些可以给出相同的最终结果呢?这是因为在像这里这样的基本情况下,查询规划器会将前面视为ORDER BY应用范围分区的提示,而不是哈希分区。ORDER BY因此,接下来的物理计划GROUP BY将是

== Physical Plan ==
*(2) HashAggregate(keys=[End Date DT#38], functions=[count(1)])
+- *(2) HashAggregate(keys=[End Date DT#38], functions=[partial_count(1)])
   +- *(2) Sort [End Date DT#38 DESC NULLS LAST], true, 0
      +- Exchange rangepartitioning(End Date DT#38 DESC NULLS LAST, 200)
         +- *(1) Project [End Date DT#38]
            +- *(1) Filter (isnotnull(Subscriber Type#39) && (Subscriber Type#39 = Subscriber))
               +- Scan ExistingRDD[End Date DT#38,Subscriber Type#39]
Run Code Online (Sandbox Code Playgroud)

如果没有ORDER BY*** 则默认为哈希分区

== Physical Plan ==
*(2) HashAggregate(keys=[End Date DT#38], functions=[count(1)])
+- Exchange hashpartitioning(End Date DT#38, 200)
   +- *(1) HashAggregate(keys=[End Date DT#38], functions=[partial_count(1)])
      +- *(1) Project [End Date DT#38]
         +- *(1) Filter (isnotnull(Subscriber Type#39) && (Subscriber Type#39 = Subscriber))
            +- Scan ExistingRDD[End Date DT#38,Subscriber Type#39]
Run Code Online (Sandbox Code Playgroud)

因为这发生在规划阶段,这是高影响力的扩展点(特别是对于数据源提供者),所以我认为这是实现的细节,并且不依赖于这种行为的正确性。


* 具有针对 DSL 变体的解析逻辑计划

== Parsed Logical Plan ==
'Aggregate ['End Date DT], [unresolvedalias('End Date DT, None), count(1) AS count#45L]
+- SubqueryAlias `t2`
   +- Sort [End Date DT#38 DESC NULLS LAST], true
      +- SubqueryAlias `t1`
         +- Filter (Subscriber Type#39 = Subscriber)
            +- SubqueryAlias `t0`
               +- LogicalRDD [End Date DT#38, Subscriber Type#39], false
Run Code Online (Sandbox Code Playgroud)

对于 SQL 变体

== Parsed Logical Plan ==
'Aggregate ['End Date DT], ['End Date DT, 'COUNT(1) AS count#50]
+- 'SubqueryAlias `t2`
   +- 'Sort ['End Date DT DESC NULLS LAST], true
      +- 'Project [*]
         +- 'SubqueryAlias `t1`
            +- 'Project [*]
               +- 'Filter ('Subscriber Type = Subscriber)
                  +- 'UnresolvedRelation `t0`
Run Code Online (Sandbox Code Playgroud)

** 具有针对 DSL 变体的解析逻辑计划

== Parsed Logical Plan ==
'Sort ['End Date DT DESC NULLS LAST], true
+- SubqueryAlias `t2`
   +- Aggregate [End Date DT#38], [End Date DT#38, count(1) AS count#59L]
      +- SubqueryAlias `t1`
         +- Filter (Subscriber Type#39 = Subscriber)
            +- SubqueryAlias `t0`
               +- LogicalRDD [End Date DT#38, Subscriber Type#39], false
Run Code Online (Sandbox Code Playgroud)

对于 SQL 变体

== Parsed Logical Plan ==
'Sort ['End Date DT DESC NULLS LAST], true
+- 'Project [*]
   +- 'SubqueryAlias `t2`
      +- 'Aggregate ['End Date DT], ['End Date DT, 'COUNT(1) AS count#64]
         +- 'SubqueryAlias `t1`
            +- 'Project [*]
               +- 'Filter ('Subscriber Type = Subscriber)
                  +- 'UnresolvedRelation `t0`
Run Code Online (Sandbox Code Playgroud)

*** IE

== Optimized Logical Plan ==
Aggregate [End Date DT#38], [End Date DT#38, count(1) AS count#70L]
+- Sort [End Date DT#38 DESC NULLS LAST], true
   +- Project [End Date DT#38]
      +- Filter (isnotnull(Subscriber Type#39) && (Subscriber Type#39 = Subscriber))
         +- LogicalRDD [End Date DT#38, Subscriber Type#39], false
Run Code Online (Sandbox Code Playgroud)