Ama*_*ngh 5 apache-flink flink-streaming flink-cep
我想在 flink 作业完成后执行一些任务,在 Intellij 中运行代码时没有任何问题,但是在 shell 文件中运行 Flink jar 时出现问题。我正在使用下面的行来确保 flink 程序的执行完成
//start the execution
JobExecutionResult jobExecutionResult = envrionment.execute(" Started the execution ");
is_job_finished = jobExecutionResult.isJobExecutionResult();
Run Code Online (Sandbox Code Playgroud)
我不确定,上面的检查是否正确?
然后我在下面的方法中使用上面的变量来执行一些任务
if(print_mode && is_job_finished){
System.out.println(" \n \n -- System related variables -- \n");
System.out.println(" Stream_join Window length = " + WindowLength_join__ms + " milliseconds");
System.out.println(" Input rate for stream RR = " + input_rate_rr_S + " events/second");
System.out.println("Stream RR Runtime = " + Stream_RR_RunTime_S + " seconds");
System.out.println(" # raw events in stream RR = " + Total_Number_Of_Events_in_RR + "\n");
}
Run Code Online (Sandbox Code Playgroud)
有什么建议 ?
您可以将作业侦听器注册到执行环境。
例如
env.registerJobListener(new JobListener {
//Callback on job submission.
override def onJobSubmitted(jobClient: JobClient, throwable: Throwable): Unit = {
if (throwable == null) {
log.info("SUBMIT SUCCESS")
} else {
log.info("FAIL")
}
}
//Callback on job execution finished, successfully or unsuccessfully.
override def onJobExecuted(jobExecutionResult: JobExecutionResult, throwable: Throwable): Unit = {
if (throwable == null) {
log.info("SUCCESS")
} else {
log.info("FAIL")
}
}
})
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1226 次 |
| 最近记录: |