我有以下代码:
val launcher = new SparkLauncher()
.setSparkHome(SparkConfig.sparkHome)
.setAppResource(SparkConfig.sparkJobJar)
.setMainClass(SparkConfig.sparkJobMainClass)
.setMaster(SparkConfig.sparkMaster)
.setConf("spark.driver.extraJavaOptions", SparkConfig.sparkJobSystemProperties)
.setConf("spark.executor.extraJavaOptions", SparkConfig.sparkJobSystemProperties)
.setConf("spark.executor.memory", SparkConfig.sparkExecutorMemory)
.setConf("spark.driver.memory", SparkConfig.sparkDriverMemory)
.setConf("spark.executor.cores", SparkConfig.sparkExecutorCores)
.setConf("spark.network.timeout", SparkConfig.sparkNetworkTimeout)
.setConf("spark.app.name", s"my-app")
.setVerbose(true)
.addAppArgs(processId)
val handle = launcher.startApplication(JobListenerStatus)
//waiting for the lock releases.
isFinishedLock.acquire()
handle.getState match {
case State.FAILED | State.KILLED => {
logger.error(String.format("Process %s: Error while processing a job spark task, job finished with %s state. Please refer to job-manager logs to more information...", processId, handle.getState().name()))
throw new SparkJobExecutionException(handle.getState)
}
case _ => {
logger.info(String.format("Process %s: Finished …Run Code Online (Sandbox Code Playgroud)