Spark SQL where 子句中谓词的求值顺序

Mr *_* T. 5 query-optimization apache-spark apache-spark-sql

我试图了解 Spark SQL 中谓词求值的顺序,以提高查询的性能。
假设我有以下查询

"select * from tbl where pred1 and pred2"
Run Code Online (Sandbox Code Playgroud)

假设没有一个谓词符合下推过滤器的条件(为了简化)。另外,我们假设计算上比(假设正则表达式模式匹配与否定)pred1复杂得多。pred2

  • 有什么方法可以验证 Sparkpred2之前 是否会评估pred1
  • 这是确定性的吗?
  • 这是可控的吗?
  • 有什么办法可以看到最终的执行计划吗?

the*_*tom 3

一般的

\n

好问题。

\n

通过测试场景并因找不到合适的文档而进行推论来推断答案。第二次尝试由于网络上的各种声明无法备份。

\n
\n

我认为这个问题与 AQE Spark 3.x 方面无关,而是\n关于作为 Spark 应用程序第 N 阶段一部分的数据帧,该数据帧已经\n通过了从静态源获取数据的阶段,这是\n主题应用多个谓词进行过滤。

\n

那么中心点是谓词的排序方式是否重要,或者 Spark (Catalyst) 是否重新排序谓词以最小化\n要完成的工作?

\n
    \n
  • 这里的前提是,首先过滤掉最大数量的数据比评估过滤掉很少\n的谓词更有意义。\n
      \n
    • 这是一个著名的 RDBMS 点,涉及可控制谓词(受定义随时间的演变的影响)。\n
        \n
      • 很多讨论都集中在索引上,Spark、Hive 没有这个,但 DF 是柱状的。
      • \n
      \n
    • \n
    \n
  • \n
\n
\n
\n

第1点

\n

你可以尝试%sql

\n
 EXPLAIN EXTENDED select k, sum(v) from values (1, 2), (1, 3) t(k, v) group by k;\n
Run Code Online (Sandbox Code Playgroud)\n

从这里您可以看到如果重新排列谓词会发生什么,但我在 Databricks 上的非 AQE 模式的物理计划中没有看到这样的方面。请参阅\n https://docs.databricks.com/sql/language-manual/sql-ref-syntax-qry-explain.html

\n

Catalyst 可以重新安排我在这里和那里读过的过滤。到什么程度,需要进行大量的研究;我无法证实这一点。

\n

还有一个有趣的读物:\n https://www.waitingforcode.com/apache-spark-sql/catalyst-optimizer-in-spark-sql/read

\n
\n
\n

第2点

\n

我使用相同\n功能查询运行了以下可悲的人为示例,但谓词相反,使用\n具有高基数的列并测试了实际上不存在的值\n然后比较了 UDF 中使用的累加器的计数当被叫时。

\n
\n

场景1

\n
import org.apache.spark.sql.functions._\n\ndef randomInt1to1000000000 = scala.util.Random.nextInt(1000000000)+1\ndef randomInt1to10 = scala.util.Random.nextInt(10)+1\ndef randomInt1to1000000 = scala.util.Random.nextInt(1000000)+1\n\nval df = sc.parallelize(Seq.fill(1000000){(randomInt1to1000000,randomInt1to1000000000,randomInt1to10)}).toDF("nuid","hc", "lc").withColumn("text", lpad($"nuid", 3, "0")).withColumn("literal",lit(1)) \n\nval accumulator = sc.longAccumulator("udf_call_count")\n\nspark.udf.register("myUdf", (x: String) => {accumulator.add(1)\n                                            x.length}\n                  )  \n\naccumulator.reset()\ndf.where("myUdf(text) = 3 and hc = -4").select(max($"text")).show(false)\nprintln(s"Number of UDF calls ${accumulator.value}")  \n
Run Code Online (Sandbox Code Playgroud)\n

返回:

\n
+---------+\n|max(text)|\n+---------+\n|null     |\n+---------+\n\nNumber of UDF calls 1000000 \n
Run Code Online (Sandbox Code Playgroud)\n

场景2

\n
import org.apache.spark.sql.functions._\n\ndef randomInt1to1000000000 = scala.util.Random.nextInt(1000000000)+1\ndef randomInt1to10 = scala.util.Random.nextInt(10)+1\ndef randomInt1to1000000 = scala.util.Random.nextInt(1000000)+1\n\nval dfA = sc.parallelize(Seq.fill(1000000){(randomInt1to1000000,randomInt1to1000000000,randomInt1to10)}).toDF("nuid","hc", "lc").withColumn("text", lpad($"nuid", 3, "0")).withColumn("literal",lit(1)) \n\nval accumulator = sc.longAccumulator("udf_call_count")\n\nspark.udf.register("myUdf", (x: String) => {accumulator.add(1)\n                                            x.length}\n                  )  \n\naccumulator.reset()\ndfA.where("hc = -4 and myUdf(text) = 3").select(max($"text")).show(false)\nprintln(s"Number of UDF calls ${accumulator.value}")\n
Run Code Online (Sandbox Code Playgroud)\n

返回:

\n
+---------+\n|max(text)|\n+---------+\n|null     |\n+---------+\n\nNumber of UDF calls 0\n
Run Code Online (Sandbox Code Playgroud)\n

我的结论是:

\n
    \n
  • 在这种情况下,存在从左到右的评估,因为场景 2 的累加器值为 0,因此有 0 个对 udf 的调用,而场景 1 注册了 1M 个调用。

    \n
  • \n
  • 因此,ORACLE 和 DB2 可能对第 1 阶段谓词执行的谓词处理顺序并不适用。

    \n
  • \n
\n
\n

第3点

\n

然而,我从手册中注意到\n https://docs.databricks.com/spark/latest/spark-sql/udf-scala.html\n以下内容:

\n

评估顺序和空检查

\n

Spark SQL(包括 SQL 以及 DataFrame 和 Dataset API)\n不保证子表达式的求值顺序。特别是,运算符或函数的输入不一定按从左到右或任何其他固定顺序进行计算。例如,逻辑 AND\n 和 OR 表达式不具有从左到右 \xe2\x80\x9cshort-circing\xe2\x80\x9d\n 语义。

\n

因此,依赖布尔表达式的副作用或顺序以及 WHERE 和 HAVING 子句的顺序是危险的,因为此类表达式和子句可以在查询优化和规划期间重新排序。具体来说,如果 UDF 依赖于 SQL 中的短路语义进行 null 检查,则无法保证在调用 UDF 之前进行 null 检查。例如\n

\n
spark.udf.register("strlen", (s: String) => s.length)\nspark.sql("select s from test1 where s is not null and strlen(s) > 1") // no guarantee\n
Run Code Online (Sandbox Code Playgroud)\n

此 WHERE 子句不保证在过滤掉空值之后\n调用 strlen UDF。

\n

要执行正确的 null 检查,我们建议您执行以下任一操作:\n

\n

使 UDF 本身能够识别空值并在 UDF 本身内部执行空值检查 使用 IF 或 CASE WHEN 表达式执行空值检查并在条件分支中调用 UDF。

\n
spark.udf.register("strlen_nullsafe", (s: String) => if (s != null) s.length else -1)\nspark.sql("select s from test1 where s is not null and strlen_nullsafe(s) > 1") // ok\nspark.sql("select s from test1 where if(s is not null, strlen(s), null) > 1")   // ok\n
Run Code Online (Sandbox Code Playgroud)\n

有点矛盾。

\n
\n