我有以下作为一个例子:
val sc: SparkContext // An existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val df = sqlContext.read.json("examples/src/main/resources/people.json")
df.count
Run Code Online (Sandbox Code Playgroud)
我知道我可以使用Spark上下文监视作业,使用SparkListener; 然而,这给了我所有工作的事件(我不能使用,因为我不知道工作ID).
如何才能获得"计数"操作的进度?
正如评论中已经建议的那样,可以使用Spark UI 的 REST API来收集所需的数字。
主要问题是确定您感兴趣的阶段。从代码到阶段不存在 1:1 的映射。例如,单个计数将触发两个阶段(一个阶段用于计算数据帧每个分区中的元素,第二个阶段用于总结第一阶段的结果)。阶段通常获取触发其执行的操作的名称,尽管这可能会在代码中更改。
我们可以创建一种方法,查询 REST API 中具有特定名称的所有阶段,然后将这些阶段的所有任务数以及已完成的任务数相加。假设所有任务大约花费相似的执行时间(如果数据集具有倾斜分区,则该假设是错误的),可以使用已完成任务的份额作为作业进度的衡量标准。
def countTasks(sparkUiUrl: String, stageName: String): (Int, Int) = {
import scala.util.parsing.json._
import scala.collection.mutable.ListBuffer
def get(url: String) = scala.io.Source.fromURL(url).mkString
//get the ids of all running applications and collect them in a ListBuffer
val applications = JSON.parseFull(get(sparkUiUrl + "/api/v1/applications?staus=running"))
val apps: ListBuffer[String] = new scala.collection.mutable.ListBuffer[String]
applications match {
case Some(l: List[Map[String, String]]) => l.foreach(apps += _ ("id"))
case other => println("Unknown data structure while reading applications: " + other)
}
var countTasks: Int = 0;
var countCompletedTasks: Int = 0;
//get the stages for each application and sum up the number of tasks for each stage with the requested name
apps.foreach(app => {
val stages = JSON.parseFull(get(sparkUiUrl + "/api/v1/applications/" + app + "/stages"))
stages match {
case Some(l: List[Map[String, Any]]) => l.foreach(m => {
if (m("name") == stageName) {
countTasks += m("numTasks").asInstanceOf[Double].toInt
countCompletedTasks += m("numCompleteTasks").asInstanceOf[Double].toInt
}
})
case other => println("Unknown data structure while reading stages: " + other)
}
})
//println(countCompletedTasks + " of " + countTasks + " tasks completed")
(countTasks, countCompletedTasks)
}
Run Code Online (Sandbox Code Playgroud)
针对给定的计数示例调用此函数
println(countTasks("http://localhost:4040", "show at CountExample.scala:16"))
Run Code Online (Sandbox Code Playgroud)
将打印出两个数字:第一个是所有任务的数量,第二个是已完成任务的数量。
我已经用 Spark 2.3.0 测试了这段代码。在生产环境中使用它之前,肯定需要一些额外的修饰,尤其是一些更复杂的错误检查。不仅可以通过计算已完成的任务,还可以通过计算失败的任务来改进统计数据。
| 归档时间: |
|
| 查看次数: |
825 次 |
| 最近记录: |