SparkSession.sql和Dataset.sqlContext.sql有什么区别?

use*_*400 3 apache-spark apache-spark-sql

我有以下代码片段,我想知道这两者之间有什么区别,我应该使用哪一个?我正在使用spark 2.2.

Dataset<Row> df = sparkSession.readStream()
    .format("kafka")
    .load();

df.createOrReplaceTempView("table");
df.printSchema();

Dataset<Row> resultSet =  df.sqlContext().sql("select value from table"); //sparkSession.sql(this.query);
StreamingQuery streamingQuery = resultSet
        .writeStream()
        .trigger(Trigger.ProcessingTime(1000))
        .format("console")
        .start();
Run Code Online (Sandbox Code Playgroud)

VS

Dataset<Row> df = sparkSession.readStream()
    .format("kafka")
    .load();

df.createOrReplaceTempView("table");

Dataset<Row> resultSet =  sparkSession.sql("select value from table"); //sparkSession.sql(this.query);
StreamingQuery streamingQuery = resultSet
        .writeStream()
        .trigger(Trigger.ProcessingTime(1000))
        .format("console")
        .start();
Run Code Online (Sandbox Code Playgroud)

Jac*_*ski 8

之间存在非常细微的差别sparkSession.sql("sql query")VS df.sqlContext().sql("sql query").

请注意,您可以SparkSession在单个Spark应用程序中拥有零个,两个或更多个(但假设您至少SparkSessionSpark SQL应用程序中只有一个).

还请注意,a Dataset必然会SparkSession被创建,并且SparkSession永远不会改变.

您可能想知道为什么有人会想要它,但是这为您提供了查询之间的界限,您可以为不同的数据集使用相同的表名,这实际上是Spark SQL的一个非常强大的功能.

以下示例显示了差异,希望能让您了解为什么它的强大功能.

scala> spark.version
res0: String = 2.3.0-SNAPSHOT

scala> :type spark
org.apache.spark.sql.SparkSession

scala> spark.sql("show tables").show
+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
+--------+---------+-----------+

scala> val df = spark.range(5)
df: org.apache.spark.sql.Dataset[Long] = [id: bigint]

scala> df.sqlContext.sql("show tables").show
+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
+--------+---------+-----------+

scala> val anotherSession = spark.newSession
anotherSession: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@195c5803

scala> anotherSession.range(10).createOrReplaceTempView("new_table")

scala> anotherSession.sql("show tables").show
+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
|        |new_table|       true|
+--------+---------+-----------+


scala> df.sqlContext.sql("show tables").show
+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
+--------+---------+-----------+
Run Code Online (Sandbox Code Playgroud)