在Spark Streaming中,如何检测空批处理?
让我们采用有状态流式字数计算示例:https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java.是否可以在将新单词添加到流中时打印单词计数RDD?
我是这样做的。创建一个空 RDD,即您的 previousWindow。然后在 forEachRDD 中,计算最后一个窗口和当前窗口之间的差异。如果当前窗口包含前一个窗口中没有的记录,则批处理中有新内容。最后,将前一个窗口设置为当前窗口中的内容。
...
var previousWindowRdd = sc.emptyRDD[String]
dStream.foreachRDD {
windowRdd => {
if (!windowRdd.isEmpty) processWindow(windowRdd.cache())
}
}
...
def processWindow(windowRdd: RDD[String]) = {
val newInBatch = windowRdd.subtract(previousWindowRdd)
if (!newInBatch.isEmpty())
processNewBatch(windowRdd)
previousWindowRdd = windowRdd
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1505 次 |
| 最近记录: |