Apache Spark:如何取消代码中的作业并终止正在运行的任务?

Zoh*_*eir 5 hadoop scala hadoop-yarn apache-spark

我正在 Hadoop 集群上运行 Spark 应用程序(版本 1.6.0),并在客户端模式下使用 Yarn(版本 2.6.0)。我有一段运行长时间计算的代码,如果它花费的时间太长,我想杀死它(然后运行一些其他函数)。
这是一个例子:

val conf = new SparkConf().setAppName("TIMEOUT_TEST")
val sc = new SparkContext(conf)
val lst = List(1,2,3)
// setting up an infite action
val future = sc.parallelize(lst).map(while (true) _).collectAsync()

try {
    Await.result(future, Duration(30, TimeUnit.SECONDS))
    println("success!")
} catch {
    case _:Throwable =>
        future.cancel()
        println("timeout")
}

// sleep for 1 hour to allow inspecting the application in yarn
Thread.sleep(60*60*1000)
sc.stop()
Run Code Online (Sandbox Code Playgroud)

超时设置为 30 秒,但当然计算是无限的,因此等待 future 的结果将抛出异常,该异常将被捕获,然后 future 将被取消并执行备份函数。
这一切都运行得很好,只是取消的作业没有完全终止:当查看应用程序的 Web UI 时,作业被标记为失败,但我可以看到内部仍然有正在运行的任务。

当我使用 SparkContext.cancelAllJobs 或 SparkContext.cancelJobGroup 时,也会发生同样的情况。问题是,即使我设法继续执行我的程序,已取消作业的正在运行的任务仍然占用宝贵的资源(这最终会让我的速度几乎停止)。

总而言之:如何以同时终止该作业的所有正在运行的任务的方式终止 Spark 作业?(与现在发生的情况相反,即停止作业运行新任务,但让当前正在运行的任务完成)

更新:
在很长一段时间忽略这个问题之后,我们找到了一个混乱但有效的小解决方法。我们没有尝试从 Spark 应用程序中终止相应的 Spark 作业/阶段,而是只是在超时发生时记录所有活动阶段的阶段 ID,并向用于终止的 Spark Web UI 提供的 URL 发出 HTTP GET 请求说阶段。

Bor*_*ris 3

为了未来的访问者,Spark 从 2.0.3 开始引入了Spark 任务收割机,它确实(或多或少)解决了这个场景,并且是一个内置的解决方案。请注意,如果任务没有响应,最终可能会杀死执行程序。

此外,一些内置的 Spark 数据源已被重构,以更好地响应 Spark:

对于 1.6.0 版本,Zohar 的解决方案是一种“混乱但高效”的解决方案。