如何使用外部触发器停止结构化流式查询?

Lit*_*chy 4 apache-spark spark-structured-streaming

我正在使用 spark 结构化流媒体,我想检查是否stop存在文件以退出我的程序。

我可以做这样的事情:

def main(args: Array[String]) = {
    val query = SparkSession...load.writeStream.foreachBatch{
      if (stop file exist) exit(0)
      // do some processing here
    }.start()
    // add Execute Listener here to listen query
    query.awaitTermination()
}
Run Code Online (Sandbox Code Playgroud)

但是,这只能在有新行附加到此查询表时触发。如果没有新行,该stop文件将没有任何影响。

实现这个触发器有什么更好的主意吗?


以上是问题,感谢下面接受的答案,这是我最终运行良好的代码。

object QueryManager {
  def queryTerminator(query: StreamingQuery): Runnable = new Runnable {
    override def run() = {if(stop condition) query.stop()}
  }
  def listenTermination(query: StreamingQuery) = {
    Executors.newSingleThreadScheduledExecutor.scheduleWithFixedDelay(
      queryTerminator(query), initialDelay=1, delay=1, SECONDS
    )
  }
}
// and in main method
def main(args: Array[String]) = {
    val query = SparkSession...load.writeStream.foreachBatch{      
      // do some processing here
    }.start()
    // add Execute Listener here to listen query
    QueryManager.listenTermination(query)

    query.awaitTermination()


    // I am not familar with scala, 
    // but it seems would not exit if we do not add this
    System.exit(0) 
}
Run Code Online (Sandbox Code Playgroud)

如果有任何错误,请告诉我。

Jac*_*ski 8

实现这个触发器有什么更好的主意吗?

流查询是结构化流应用程序的单独守护线程。它会一直运行,直到使用StreamingQuery.stop停止。

至少有两种方法可以访问正在运行的流查询:

  1. DataStreamWriter.start()
  2. 流查询管理器

这个想法是在您的结构化流应用程序中有一个“控制线程”,它将侦听停止请求(带有一个或多个流查询的 ID)并简单地stop对正在运行的流查询执行。


将 Spark Structured Streaming 应用程序视为具有多个线程的单 JVM 应用程序。您可以再有一个来控制线程。这就是基本的想法。