为什么即使没有新数据可用,Spark Streaming 也会执行 foreachRDD?

osk*_*osk 1 scala apache-spark spark-streaming

我有以下 Spark 流示例:

val conf = new SparkConf().setAppName("Name").setMaster("local")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(2))

val directoryStream = ssc.textFileStream("""file:///C:/Users/something/something""")
directoryStream.foreachRDD(file => {
  println(file.count())
})

ssc.start()
ssc.awaitTermination()
Run Code Online (Sandbox Code Playgroud)

即使文件夹为空,它也会每 2 秒打印 0,就像文件夹中存在空文件一样。我希望它只foreachRDD在文件夹中存在新文件时才进入。有什么我做错了吗?

我使用的是 Spark 1.6 和 Scala 2.10.7。

小智 5

由于您的批处理持续时间为 2 秒,作业将每 2 秒触发一次,基本上触发点不是数据可用性,而是批处理持续时间,如果 DStream 时存在的数据包含数据,否则它将为空(使用下面的代码来检查相同的)

 dstream.foreachRDD{ rdd => if (!rdd.isEmpty) {// do something } }
Run Code Online (Sandbox Code Playgroud)