Lit*_*chy 4 apache-spark spark-structured-streaming
我正在使用 spark 结构化流媒体,我想检查是否stop存在文件以退出我的程序。
我可以做这样的事情:
def main(args: Array[String]) = {
val query = SparkSession...load.writeStream.foreachBatch{
if (stop file exist) exit(0)
// do some processing here
}.start()
// add Execute Listener here to listen query
query.awaitTermination()
}
Run Code Online (Sandbox Code Playgroud)
但是,这只能在有新行附加到此查询表时触发。如果没有新行,该stop文件将没有任何影响。
实现这个触发器有什么更好的主意吗?
以上是问题,感谢下面接受的答案,这是我最终运行良好的代码。
object QueryManager {
def queryTerminator(query: StreamingQuery): Runnable = new Runnable {
override def run() = {if(stop condition) query.stop()}
}
def listenTermination(query: StreamingQuery) = {
Executors.newSingleThreadScheduledExecutor.scheduleWithFixedDelay(
queryTerminator(query), initialDelay=1, delay=1, SECONDS
)
}
}
// and in main method
def main(args: Array[String]) = {
val query = SparkSession...load.writeStream.foreachBatch{
// do some processing here
}.start()
// add Execute Listener here to listen query
QueryManager.listenTermination(query)
query.awaitTermination()
// I am not familar with scala,
// but it seems would not exit if we do not add this
System.exit(0)
}
Run Code Online (Sandbox Code Playgroud)
如果有任何错误,请告诉我。
实现这个触发器有什么更好的主意吗?
流查询是结构化流应用程序的单独守护线程。它会一直运行,直到使用StreamingQuery.stop停止。
至少有两种方法可以访问正在运行的流查询:
这个想法是在您的结构化流应用程序中有一个“控制线程”,它将侦听停止请求(带有一个或多个流查询的 ID)并简单地stop对正在运行的流查询执行。
将 Spark Structured Streaming 应用程序视为具有多个线程的单 JVM 应用程序。您可以再有一个来控制线程。这就是基本的想法。
| 归档时间: |
|
| 查看次数: |
684 次 |
| 最近记录: |