在Spark Streaming中,如何检测空批处理?

Pin*_*nch 5 apache-spark

在Spark Streaming中,如何检测空批处理?

让我们采用有状态流式字数计算示例:https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java.是否可以在将新单词添加到流中时打印单词计数RDD?

Kev*_*uli 2

我是这样做的。创建一个空 RDD,即您的 previousWindow。然后在 forEachRDD 中,计算最后一个窗口和当前窗口之间的差异。如果当前窗口包含前一个窗口中没有的记录,则批处理中有新内容。最后,将前一个窗口设置为当前窗口中的内容。

  ...

  var previousWindowRdd = sc.emptyRDD[String]

  dStream.foreachRDD {
    windowRdd => {
      if (!windowRdd.isEmpty) processWindow(windowRdd.cache())
    }
  }

  ...

def processWindow(windowRdd: RDD[String]) = {
  val newInBatch = windowRdd.subtract(previousWindowRdd)

  if (!newInBatch.isEmpty())
    processNewBatch(windowRdd)

  previousWindowRdd = windowRdd
}
Run Code Online (Sandbox Code Playgroud)