如何在awaitTermination之后获取流式查询的进度?

dex*_*007 5 apache-spark spark-structured-streaming

我是 Spark 新手,正在阅读一些有关监控 Spark 应用程序的内容。基本上,我想知道 Spark 应用程序在给定的触发时间内处理了多少条记录以及查询进度。我知道“lastProgress”给出了所有这些指标,但是当我将awaitTermination与“lastProgress”一起使用时,它总是返回null。

 val q4s = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", brokers)
  .option("subscribe", topic)
  .option("startingOffsets", "earliest")
  .load()
  .writeStream
  .outputMode("append")
  .option("checkpointLocation", checkpoint_loc)
  .trigger(Trigger.ProcessingTime("10 seconds"))
  .format("console")
  .start()

  println("Query Id: "+ q4s.id.toString())
  println("QUERY PROGRESS.........")
println(q4s.lastProgress);
q4s.awaitTermination();
Run Code Online (Sandbox Code Playgroud)

输出:

Query Id: efd6bc15-f10c-4938-a1aa-c81fdb2b33e3
QUERY PROGRESS.........
null
Run Code Online (Sandbox Code Playgroud)

如何在使用awaitTermination时获取查询的进度,或者如何在不使用awaitTermination的情况下保持查询持续运行?

提前致谢。

mik*_*ike 6

Using dedicated runnable thread

You can create a dedicated Thread continuously printing the last progress of your streaming query.

First, define a runnable Monitoring class which prints out the last Progress every 10 seconds (10000ms):

class StreamingMonitor(q: StreamingQuery) extends Runnable {
  def run {
    while(true) {
      println("Time: " + Calendar.getInstance().getTime())
      println(q.lastProgress)
      Thread.sleep(10000)
    }
  }
}
Run Code Online (Sandbox Code Playgroud)

Second, implement this into your application code as below:

val q4s: StreamingQuery = df.writeStream
  [...]
  .start()

new Thread(new StreamingMonitor(q4s)).start()

q4s.awaitTermination()
Run Code Online (Sandbox Code Playgroud)

Looping over query status

You could also have a while loop on the status of the query:

val q4s: StreamingQuery = df.writeStream
  [...]
  .start()

new Thread(new StreamingMonitor(q4s)).start()

q4s.awaitTermination()
Run Code Online (Sandbox Code Playgroud)

Alternative Solution using StreamingQueryListener

An alternative solution to monitor your streaming query would be to use the StreamingQueryListener. Again, first define a Class extending the StreamingQueryListener:

val q4s: StreamingQuery = df.writeStream
  [...]
  .start()

while(q4s.isActive) {
  println(q4s.lastProgress)
  Thread.sleep(10000)
}

q4s.awaitTermination()
Run Code Online (Sandbox Code Playgroud)

then registering it with your SparkSession:

import org.apache.spark.sql.streaming.{StreamingQueryListener, StreamingQueryProgress}
import org.apache.spark.sql.streaming.StreamingQueryListener.QueryProgressEvent


class MonitorListener extends StreamingQueryListener {

  override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = { }

  override def onQueryProgress(event: QueryProgressEvent): Unit = {
    println(s"""numInputRows: ${event.progress.numInputRows}""")
    println(s"""processedRowsPerSecond: ${event.progress.processedRowsPerSecond}""")
  }

  override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = { }
}
Run Code Online (Sandbox Code Playgroud)


Jac*_*ski 2

您必须启动一个单独的线程,并引用流查询来监视(例如q4s)并定期拉取进度。

启动查询的线程(Spark 结构化流应用程序的主线程)通常是awaitTermination这样的,因此它启动的流查询的守护线程可以继续运行。