osk*_*osk 1 scala apache-spark spark-streaming
我有以下 Spark 流示例:
val conf = new SparkConf().setAppName("Name").setMaster("local")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(2))
val directoryStream = ssc.textFileStream("""file:///C:/Users/something/something""")
directoryStream.foreachRDD(file => {
println(file.count())
})
ssc.start()
ssc.awaitTermination()
Run Code Online (Sandbox Code Playgroud)
即使文件夹为空,它也会每 2 秒打印 0,就像文件夹中存在空文件一样。我希望它只foreachRDD在文件夹中存在新文件时才进入。有什么我做错了吗?
我使用的是 Spark 1.6 和 Scala 2.10.7。
小智 5
由于您的批处理持续时间为 2 秒,作业将每 2 秒触发一次,基本上触发点不是数据可用性,而是批处理持续时间,如果 DStream 时存在的数据包含数据,否则它将为空(使用下面的代码来检查相同的)
dstream.foreachRDD{ rdd => if (!rdd.isEmpty) {// do something } }
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
674 次 |
| 最近记录: |