Spark-如何通过“ SparkLauncher”识别失败的作业

Yoh*_*age 6 apache-spark

我正在使用Spark 2.0,有时由于输入问题我的工作失败了。例如,我正在根据日期从S3文件夹中读取CSV文件,并且如果当前日期没有数据,则我的工作没有任何可处理的,因此它引发了如下异常。这被打印在驾驶员的日志中。

Exception in thread "main" org.apache.spark.sql.AnalysisException: Path does not exist: s3n://data/2016-08-31/*.csv;
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:40)
    at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:58)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:174)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:67)
...
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:729)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:185)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:210)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
 16/09/03 10:51:54 INFO SparkContext: Invoking stop() from shutdown hook
 16/09/03 10:51:54 INFO SparkUI: Stopped Spark web UI at http://192.168.1.33:4040
 16/09/03 10:51:54 INFO StandaloneSchedulerBackend: Shutting down all executors
 16/09/03 10:51:54 INFO CoarseGrainedSchedulerBackend$DriverEndpoint: Asking each executor to shut down
Spark App app-20160903105040-0007 state changed to FINISHED
Run Code Online (Sandbox Code Playgroud)

但是,尽管有这个未捕获的异常,但我的Spark Job状态为“完成”。我希望它处于“ FAILED”状态,因为有一个例外。为什么将其标记为“完成”?我如何找出工作是否失败?

注意:我正在使用SparkLauncher生成Spark作业,并通过AppHandle监听状态更改。但是我收到的状态更改已完成,而我期望失败。

小智 1

您看到的FINISHED是 Spark 应用程序而不是作业。由于 Spark 上下文能够正确启动和停止,因此已完成。

您可以使用JavaSparkStatusTracker查看任何作业信息。对于活动作业,无需执行任何其他操作,因为它具有“.getActiveJobIds”方法。

为了完成/失败,您需要在调用 Spark 执行的线程中设置作业组 ID:

JavaSparkContext sc;
... 
sc.setJobGroup(MY_JOB_ID, "Some description");
Run Code Online (Sandbox Code Playgroud)

然后,只要您需要,您就可以读取指定作业组中每个作业的状态:

JavaSparkStatusTracker statusTracker = sc.statusTracker();
for (int jobId : statusTracker.getJobIdsForGroup(JOB_GROUP_ALL)) {
    final SparkJobInfo jobInfo = statusTracker.getJobInfo(jobId);
    final JobExecutionStatus status = jobInfo.status();
}
Run Code Online (Sandbox Code Playgroud)

JobExecutionStatus 可以是RUNNINGSUCCEEDEDFAILEDUNKNOWN之一;最后一个是作业已提交但尚未实际开始的情况。

注意:所有这些都可以从 Spark 驱动程序获得,它是您使用SparkLauncher启动的 jar 。所以上面的代码应该放入jar中。

如果您想检查Spark Launcher方面是否存在任何故障,如果检测到作业失败,您可以使用System.exit(1)类型退出由 Jar 启动的应用程序,退出代码不为 0 。SparkLauncher::launch返回的Process包含exitValue方法,因此您可以检测它是否失败。