如何确保flink作业已完成执行然后执行一些任务

Ama*_*ngh 5 apache-flink flink-streaming flink-cep

我想在 flink 作业完成后执行一些任务,在 Intellij 中运行代码时没有任何问题,但是在 shell 文件中运行 Flink jar 时出现问题。我正在使用下面的行来确保 flink 程序的执行完成

//start the execution

JobExecutionResult jobExecutionResult = envrionment.execute(" Started the execution ");

 is_job_finished = jobExecutionResult.isJobExecutionResult();
Run Code Online (Sandbox Code Playgroud)

我不确定,上面的检查是否正确?

然后我在下面的方法中使用上面的变量来执行一些任务

    if(print_mode && is_job_finished){



        System.out.println(" \n \n -- System related  variables  -- \n");

        System.out.println(" Stream_join Window length = " + WindowLength_join__ms + " milliseconds");
        System.out.println(" Input rate for stream RR  = " + input_rate_rr_S + " events/second");
        System.out.println("Stream RR Runtime = " + Stream_RR_RunTime_S + " seconds");
        System.out.println(" # raw events in stream RR  = " + Total_Number_Of_Events_in_RR + "\n");

}
Run Code Online (Sandbox Code Playgroud)

有什么建议 ?

mst*_*tzn 0

您可以将作业侦听器注册到执行环境。

例如

env.registerJobListener(new JobListener {
      //Callback on job submission.
      override def onJobSubmitted(jobClient: JobClient, throwable: Throwable): Unit = {
        if (throwable == null) {
          log.info("SUBMIT SUCCESS")
        } else {
          log.info("FAIL")
        }
      }
    //Callback on job execution finished, successfully or unsuccessfully.
      override def onJobExecuted(jobExecutionResult: JobExecutionResult, throwable: Throwable): Unit = {

        if (throwable == null) {
          log.info("SUCCESS")
        } else {
          log.info("FAIL")
        }
      }
    })
Run Code Online (Sandbox Code Playgroud)