使用 Spark Catalyst 逻辑计划修改查询

use*_*897 1 apache-spark apache-spark-sql

是否可以使用扩展点在 DataFrame API/SQL 中添加/替换现有列表达式。

例如:假设我们注入解析规则,该规则可以检查计划中的项目节点,并在检查“名称”列时,将其替换为 upper(name)。

使用扩展点可以实现这样的事情吗?我发现的例子大多很简单,它们没有按照我需要的方式操作输入表达式。

请告诉我这是否可能。

mor*_*007 5

是的,这是可能的。

让我们举个例子。假设我们要编写一个规则来检查 Project 运算符,如果该项目针对某个特定列(例如“column2”),则将其乘以 2。

import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.Column
import org.apache.spark.sql.types._

object DoubleColumn2OptimizationRule extends Rule[LogicalPlan] {
    def apply(plan: LogicalPlan): LogicalPlan = plan transform {
        case p: Project =>
          if (p.projectList.filter(_.name == "column2").size >= 1) {
              val newList = p.projectList.map { case x =>
                if (x.name == "column2") {
                  Alias(Multiply(Literal(2, IntegerType), x), "column2_doubled")()
                } else {
                  x
                }
              }
              p.copy(projectList = newList)
          } else {
              p
          }
    }
}
Run Code Online (Sandbox Code Playgroud)

假设我们有一个表“table1”,其中有两列 - column1、column2。

如果没有这条规则——

> spark.sql("select column2 from table1 limit 10").collect()
Array([1], [2], [3], [4], [5], [6], [7], [8], [9], [10])
Run Code Online (Sandbox Code Playgroud)

有了这个规则 -

> spark.experimental.extraOptimizations =  Seq(DoubleColumn2OptimizationRule)
> spark.sql("select column2 from table1 limit 10").collect()
Array([2], [4], [6], [8], [10], [12], [14], [16], [18], [20])
Run Code Online (Sandbox Code Playgroud)

您也可以在 DataFrame 上调用解释来检查计划 -

> Spark.sql("从表 1 限制 10 中选择列 2").explain
== 物理计划 ==
收集限制 10
+- *(1) 本地限制 10
   +- *(1) 项目 [(2 * column2#213) AS column2_doubled#214]
      +- HiveTableScan [column2#213],HiveTableRelation `default`.`table1`,org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe,[column1#212,column2#213]