Ley*_*h G 10 scala apache-spark spark-structured-streaming
我有一个运行良好的结构化流设置,但我希望在它运行时监视它.
我已经构建了一个EventCollector
class EventCollector extends StreamingQueryListener{
override def onQueryStarted(event: QueryStartedEvent): Unit = {
println("Start")
}
override def onQueryProgress(event: QueryProgressEvent): Unit = {
println(event.queryStatus.prettyJson)
}
override def onQueryTerminated(event: QueryTerminatedEvent): Unit = {
println("Term")
}
Run Code Online (Sandbox Code Playgroud)
我已经构建了一个EventCollector并将监听器添加到我的spark会话中
val listener = new EventCollector()
spark.streams.addListener(listener)
Run Code Online (Sandbox Code Playgroud)
然后我解除了查询
val query = inputDF.writeStream
//.format("console")
.queryName("Stream")
.foreach(writer)
.start()
query.awaitTermination()
Run Code Online (Sandbox Code Playgroud)
但是,onQueryProgress永远不会被击中.onQueryStarted确实如此,但我希望以一定的时间间隔获取查询的进度,以监控查询的执行情况.任何人都可以协助吗?
经过对这个主题的大量研究,这就是我发现的......
OnQueryProgress 在查询之间被命中。我不确定这个有意的功能与否,但是当我们从文件流式传输数据时, OnQueryProgress 不会触发。
我发现的一个解决方案是依靠 foreach 编写器接收器并在进程函数中执行我自己的性能分析。不幸的是,我们无法访问有关正在运行的查询的特定信息。或者,我还没有想出如何去做。这是我在沙箱中实现的性能分析:
val writer = new ForeachWriter[rawDataRow] {
def open(partitionId: Long, version: Long):Boolean = {
//We end up here in between files
true
}
def process(value: rawDataRow) = {
counter += 1
if(counter % 1000 == 0) {
val currentTime = System.nanoTime()
val elapsedTime = (currentTime - startTime)/1000000000.0
println(s"Records Written: $counter")
println(s"Time Elapsed: $elapsedTime seconds")
}
}
}
Run Code Online (Sandbox Code Playgroud)
另一种获取指标的方法:
另一种获取有关正在运行的查询的信息的方法是点击 spark 提供给我们的 GET 端点。
或者
文档在这里:http : //spark.apache.org/docs/latest/monitoring.html
2017 年 9 月 2 日更新: 在常规火花流上测试,而不是结构化流
免责声明,这可能不适用于结构化流媒体,我需要设置一个测试台来确认。但是,它确实适用于常规火花流(在此示例中从 Kafka 消费)。
我相信,自从 Spark Streaming 2.2 发布以来,存在新的端点,可以检索有关流性能的更多指标。这可能存在于以前的版本中,我只是错过了它,但我想确保它被记录下来,供其他搜索此信息的人使用。
http://localhost:4040/api/v1/applications/ {applicationIdHere}/streaming/statistics
这是看起来像是在 2.2 中添加的端点(或者它已经存在并且只是添加了文档,我不确定,我还没有检查过)。
无论如何,它为指定的流应用程序添加了这种格式的指标:
{
"startTime" : "2017-09-13T14:02:28.883GMT",
"batchDuration" : 1000,
"numReceivers" : 0,
"numActiveReceivers" : 0,
"numInactiveReceivers" : 0,
"numTotalCompletedBatches" : 90379,
"numRetainedCompletedBatches" : 1000,
"numActiveBatches" : 0,
"numProcessedRecords" : 39652167,
"numReceivedRecords" : 39652167,
"avgInputRate" : 771.722,
"avgSchedulingDelay" : 2,
"avgProcessingTime" : 85,
"avgTotalDelay" : 87
}
Run Code Online (Sandbox Code Playgroud)
这使我们能够使用 Spark 公开的 REST 端点构建我们自己的自定义指标/监控应用程序。
归档时间: |
|
查看次数: |
2561 次 |
最近记录: |