为什么变换在结构化流中只进行一次副作用(println)?

T. *_*eke 5 scala apache-spark apache-spark-sql spark-structured-streaming

为什么select声明是每批打印的,但hello world只打印一次?

import org.apache.spark.sql.types._
val schema = StructType(
  StructField("id", LongType, nullable = false) ::
  StructField("name", StringType, nullable = false) ::
  StructField("score", DoubleType, nullable = false) :: Nil)

val in: DataFrame = sparkSession.readStream
 .schema(schema)
 .format("csv")
 .option("header", false)
 .option("maxFilesPerTrigger", 1)
 .option("delimiter", ";")
 .load("s3://xxxxxxxx")

val input: DataFrame = in.select("*")
 .transform { ds =>
   println("hello world")  // <-- Why is this printed out once?
   ds
}

import org.apache.spark.sql.streaming.StreamingQuery
val query: StreamingQuery = input.writeStream
  .format("console")
  .start
Run Code Online (Sandbox Code Playgroud)

Jac*_*ski 9

Spark 2.1.0-SNAPSHOT在这里(今天建成),但我相信它在2.0和现在之间没有变化.

$ ./bin/spark-submit --version
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.1.0-SNAPSHOT
      /_/

Branch master
Compiled by user jacek on 2016-09-30T07:08:39Z
Revision 1fad5596885aab8b32d2307c0edecbae50d5bd7a
Url https://github.com/apache/spark.git
Type --help for more information.
Run Code Online (Sandbox Code Playgroud)

在Spark的结构化流中,您的流应用程序只是将相同的物理查询计划应用于输入数据源的技巧.

请注意,物理查询计划是你的最佳选择Dataset(我对Spark SQL的了解越多,我认为查询和数据集之间没有区别 - 这些日子它们可以简单地互换).

当您描述结构化查询(无论是一次性还是流式查询)时,它会经历4个阶段的分析,分析,优化并最终生成物理计划.您可以使用explain(extended = true)方法查看它.

scala> input.explain(extended = true)
== Parsed Logical Plan ==
StreamingRelation DataSource(org.apache.spark.sql.SparkSession@5c4f07c1,json,List(),Some(StructType(StructField(id,LongType,false), StructField(name,StringType,false), StructField(score,DoubleType,false))),List(),None,Map(path -> input-json)), FileSource[input-json], [id#15L, name#16, score#17]

== Analyzed Logical Plan ==
id: bigint, name: string, score: double
StreamingRelation DataSource(org.apache.spark.sql.SparkSession@5c4f07c1,json,List(),Some(StructType(StructField(id,LongType,false), StructField(name,StringType,false), StructField(score,DoubleType,false))),List(),None,Map(path -> input-json)), FileSource[input-json], [id#15L, name#16, score#17]

== Optimized Logical Plan ==
StreamingRelation DataSource(org.apache.spark.sql.SparkSession@5c4f07c1,json,List(),Some(StructType(StructField(id,LongType,false), StructField(name,StringType,false), StructField(score,DoubleType,false))),List(),None,Map(path -> input-json)), FileSource[input-json], [id#15L, name#16, score#17]

== Physical Plan ==
StreamingRelation FileSource[input-json], [id#15L, name#16, score#17]
Run Code Online (Sandbox Code Playgroud)

这些阶段是懒惰的,只执行一次.

一旦获得物理计划,阶段将不会再次执行.您的Dataset管道已经计算完毕,唯一缺失的部分是流经管道的数据.

这就是为什么你只看一次"hello world" - 当流式查询计划被"执行"以产生物理计划时.它被执行一次并针对处理源进行了优化Dataset(并且只有Dataset因此已经触发了任何副作用).

一个有趣的案例.把它带到这里真是太棒了!