Rap*_*nto 5 scala hadoop-yarn apache-spark
我有以下代码:
val launcher = new SparkLauncher()
.setSparkHome(SparkConfig.sparkHome)
.setAppResource(SparkConfig.sparkJobJar)
.setMainClass(SparkConfig.sparkJobMainClass)
.setMaster(SparkConfig.sparkMaster)
.setConf("spark.driver.extraJavaOptions", SparkConfig.sparkJobSystemProperties)
.setConf("spark.executor.extraJavaOptions", SparkConfig.sparkJobSystemProperties)
.setConf("spark.executor.memory", SparkConfig.sparkExecutorMemory)
.setConf("spark.driver.memory", SparkConfig.sparkDriverMemory)
.setConf("spark.executor.cores", SparkConfig.sparkExecutorCores)
.setConf("spark.network.timeout", SparkConfig.sparkNetworkTimeout)
.setConf("spark.app.name", s"my-app")
.setVerbose(true)
.addAppArgs(processId)
val handle = launcher.startApplication(JobListenerStatus)
//waiting for the lock releases.
isFinishedLock.acquire()
handle.getState match {
case State.FAILED | State.KILLED => {
logger.error(String.format("Process %s: Error while processing a job spark task, job finished with %s state. Please refer to job-manager logs to more information...", processId, handle.getState().name()))
throw new SparkJobExecutionException(handle.getState)
}
case _ => {
logger.info(String.format("Process %s: Finished with sucess.", processId))
}
}
Run Code Online (Sandbox Code Playgroud)
在我的火花工作中,例如:
def main(args: Array[String]): Unit = {
try {
doTheJob Tasks...... and simulate an error
} catch {
case t: Throwable =>
log.error("errors")
throw t
}
Run Code Online (Sandbox Code Playgroud)
}
这是我的 JobListenerStatus
object JobListenerStatus extends SparkAppHandle.Listener {
private def info(handle: SparkAppHandle): Unit = {
val appId:String = Option(handle.getAppId : String).getOrElse("Not set")
logger.info(String.format("Process %s: Yarn Application id %s with current state %s.", processId, appId, handle.getState.name()))
}
override def infoChanged(handle: SparkAppHandle): Unit = {
info(handle)
}
override def stateChanged(handle: SparkAppHandle): Unit = {
info(handle)
if (handle.getState.isFinal) {
isFinishedLock.release()
}
}
Run Code Online (Sandbox Code Playgroud)
}
问题是,即使在我的程序中抛出错误(throw t),作业的最终状态仍然是 FINISHED 状态。如何抛出异常以获得 FAILED 状态。
最终状态记录:
纱线应用程序 ID application_1475523421187_9534,当前状态为 FINISHED。
我正在使用以下课程:
org.apache.spark.launcher.SparkAppHandleorg.apache.spark.launcher.SparkLauncher提前确认。
| 归档时间: |
|
| 查看次数: |
3743 次 |
| 最近记录: |