每次执行查询时如何避免查询准备(解析、规划和优化)?

min*_*imo 5 apache-spark spark-streaming apache-spark-sql

在我们的 Spark 流应用程序中,使用 60 秒的批处理,我们在 DF 上创建一个临时表,然后对其运行大约 80 个查询,例如:

sparkSession.sql("select ... from temp_view group by ...")
Run Code Online (Sandbox Code Playgroud)

但考虑到这些是相当繁重的查询,大约有 300 个汇总列,如果我们不必分析 sql 并为每个微批处理生成查询计划,那就太好了。

没有办法生成、缓存和重用查询计划吗?即使每次查询仅节省 50 毫秒,每批也将为我们节省大约 4 秒。

我们在 CDH/YARN 上使用 Spark 2.2。谢谢。

Jac*_*ski 2

我以前没有尝试过,但是“要生成、缓存和重用查询计划”,您应该简单地(重新)使用查询(它可能不一定是您通常使用的“形状”,但有一个可能适合您的情况)。

(出声思考)

每个结构化查询(无论是数据集、数据帧还是 SQL)都会经历解析、分析、逻辑优化、规划和物理优化等阶段。

结构化查询由其​​计划描述,优化的物理查询计划是您可以使用Dataset.explain看到的计划:

explain(): Unit将物理计划打印到控制台以进行调试。

scala> spark.version
res0: String = 2.3.1-SNAPSHOT

scala> :type q
org.apache.spark.sql.DataFrame

scala> q.explain
== Physical Plan ==
*(1) Project [id#0L, (id#0L * 2) AS x2#2L]
+- *(1) Range (0, 4, step=1, splits=8)
Run Code Online (Sandbox Code Playgroud)

您不直接使用计划,但重点是您可以。另一个重要的一点是,计划通常对它们所优化的数据集一无所知(我说通常是因为 Spark SQL 有一个基于成本的优化器,它与数据一起工作以提供可能的最优化查询计划)。

每当执行操作时,查询都会通过所谓的结构化查询执行管道。每次执行操作时它都会进行“预处理”(即使是相同的操作)。这就是为什么您可以缓存结果,但这会将查询与数据永远捆绑在一起(您希望避免这种情况)。

话虽如此,我认为您可以在调用操作之前进行优化(并通过查询的“管道”泵送数据)。只需使用您可以生成的优化物理查询计划,QueryExecution.rdd该计划将为您提供代表结构化查询的 RDD。使用该 RDD,您可以简单地RDD.[theAction]每个批处理间隔,从而避免结构化查询成为 RDD 所经历的所有阶段。

scala> q.rdd
res2: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[4] at rdd at <console>:26
Run Code Online (Sandbox Code Playgroud)

您甚至可以通过使用来“优化” RDD QueryExecution.toRdd

scala> q.queryExecution.toRdd
res4: org.apache.spark.rdd.RDD[org.apache.spark.sql.catalyst.InternalRow] = MapPartitionsRDD[7] at toRdd at <console>:26
Run Code Online (Sandbox Code Playgroud)

但是(再次,大声思考)所有这些重用都会自动发生,因为阶段是惰性 val,所以只是...不,它无法工作...忽略最后一个“但是”并坚持重用底层 RDD 的想法:)它应该有效。


顺便说一句,这几乎就是 Spark Structured Streaming 用于通过微批处理执行每个批处理(间隔)的方法。但在 2.3 中,情况发生了变化。