dex*_*007 5 apache-spark spark-structured-streaming
我是 Spark 新手,正在阅读一些有关监控 Spark 应用程序的内容。基本上,我想知道 Spark 应用程序在给定的触发时间内处理了多少条记录以及查询进度。我知道“lastProgress”给出了所有这些指标,但是当我将awaitTermination与“lastProgress”一起使用时,它总是返回null。
val q4s = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokers)
.option("subscribe", topic)
.option("startingOffsets", "earliest")
.load()
.writeStream
.outputMode("append")
.option("checkpointLocation", checkpoint_loc)
.trigger(Trigger.ProcessingTime("10 seconds"))
.format("console")
.start()
println("Query Id: "+ q4s.id.toString())
println("QUERY PROGRESS.........")
println(q4s.lastProgress);
q4s.awaitTermination();
Run Code Online (Sandbox Code Playgroud)
输出:
Query Id: efd6bc15-f10c-4938-a1aa-c81fdb2b33e3
QUERY PROGRESS.........
null
Run Code Online (Sandbox Code Playgroud)
如何在使用awaitTermination时获取查询的进度,或者如何在不使用awaitTermination的情况下保持查询持续运行?
提前致谢。
You can create a dedicated Thread continuously printing the last progress of your streaming query.
First, define a runnable Monitoring class which prints out the last Progress every 10 seconds (10000ms):
class StreamingMonitor(q: StreamingQuery) extends Runnable {
def run {
while(true) {
println("Time: " + Calendar.getInstance().getTime())
println(q.lastProgress)
Thread.sleep(10000)
}
}
}
Run Code Online (Sandbox Code Playgroud)
Second, implement this into your application code as below:
val q4s: StreamingQuery = df.writeStream
[...]
.start()
new Thread(new StreamingMonitor(q4s)).start()
q4s.awaitTermination()
Run Code Online (Sandbox Code Playgroud)
You could also have a while loop on the status of the query:
val q4s: StreamingQuery = df.writeStream
[...]
.start()
new Thread(new StreamingMonitor(q4s)).start()
q4s.awaitTermination()
Run Code Online (Sandbox Code Playgroud)
An alternative solution to monitor your streaming query would be to use the StreamingQueryListener. Again, first define a Class extending the StreamingQueryListener:
val q4s: StreamingQuery = df.writeStream
[...]
.start()
while(q4s.isActive) {
println(q4s.lastProgress)
Thread.sleep(10000)
}
q4s.awaitTermination()
Run Code Online (Sandbox Code Playgroud)
then registering it with your SparkSession:
import org.apache.spark.sql.streaming.{StreamingQueryListener, StreamingQueryProgress}
import org.apache.spark.sql.streaming.StreamingQueryListener.QueryProgressEvent
class MonitorListener extends StreamingQueryListener {
override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = { }
override def onQueryProgress(event: QueryProgressEvent): Unit = {
println(s"""numInputRows: ${event.progress.numInputRows}""")
println(s"""processedRowsPerSecond: ${event.progress.processedRowsPerSecond}""")
}
override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = { }
}
Run Code Online (Sandbox Code Playgroud)
您必须启动一个单独的线程,并引用流查询来监视(例如q4s)并定期拉取进度。
启动查询的线程(Spark 结构化流应用程序的主线程)通常是awaitTermination这样的,因此它启动的流查询的守护线程可以继续运行。
| 归档时间: |
|
| 查看次数: |
3396 次 |
| 最近记录: |