我是新的flink用户,我有以下问题.我在YARN集群上使用flink将从RDBMS提取的相关数据传输到HBase.我在具有多个ExecutionEnvironments(每个RDB表一个以并行传输表行)的java上编写flink批处理应用程序,以按顺序传递表(因为env.execute()的调用是阻塞的).
我像这样开始YARN会话
export YARN_CONF_DIR=/etc/hadoop/conf
export FLINK_HOME=/opt/flink-1.3.1
export FLINK_CONF_DIR=$FLINK_HOME/conf
$FLINK_HOME/bin/yarn-session.sh -n 1 -s 4 -d -jm 2048 -tm 8096
Run Code Online (Sandbox Code Playgroud)
然后我在通过shell脚本transfer.sh启动的YARN会话上运行我的应用程序.它的内容就在这里
#!/bin/bash
export YARN_CONF_DIR=/etc/hadoop/conf
export FLINK_HOME=/opt/flink-1.3.1
export FLINK_CONF_DIR=$FLINK_HOME/conf
$FLINK_HOME/bin/flink run -p 4 transfer.jar
Run Code Online (Sandbox Code Playgroud)
当我从命令行手动启动此脚本时,它工作正常 - 作业逐个提交给YARN会话而没有错误.
现在我应该能够从另一个java程序运行这个脚本.为了这个目的,我使用
Runtime.exec("transfer.sh");
Run Code Online (Sandbox Code Playgroud)
(也许有更好的方法可以做到这一点?我已经在REST API中看到过,但由于作业管理器由YARN代理,因此存在一些困难).一开始就像往常一样工作 - 首先将几个工作提交到会话并成功完成.但以下工作未提交给YARN会议.在/opt/flink-1.3.1/log/flink-tsvetkoff-client-hadoop-dev1.log中我看到错误(并且在DEBUG级别中找不到其他错误)
The program execution failed: JobClientActor seems to have died before the JobExecutionResult could be retrieved.
Run Code Online (Sandbox Code Playgroud)
我试图自己分析这个问题,发现在向JobClientActor(即YARN集群)发送带超时的ping请求时,JobClient类中发生了这个错误.我尝试增加多个心跳和超时选项,如akka.*.timeout,akka.watch.heartbeat.*和yarn.heartbeat-delay选项,但它没有解决问题 - 新的作业不会从CliFrontend提交给YARN会话.
两种情况(手动呼叫和来自另一个程序的呼叫)的环境是相同的.我打电话的时候
$ ps axu | grep transfer
Run Code Online (Sandbox Code Playgroud)
它会给我输出
/usr/lib/jvm/java-8-oracle/bin/java -Dlog.file=/opt/flink-1.3.1/log/flink-tsvetkoff-client-hadoop-dev1.log -Dlog4j.configuration=file:/opt/flink-1.3.1/conf/log4j-cli.properties -Dlogback.configurationFile=file:/opt/flink-1.3.1/conf/logback.xml -classpath /opt/flink-1.3.1/lib/flink-metrics-graphite-1.3.1.jar:/opt/flink-1.3.1/lib/flink-python_2.11-1.3.1.jar:/opt/flink-1.3.1/lib/flink-shaded-hadoop2-uber-1.3.1.jar:/opt/flink-1.3.1/lib/log4j-1.2.17.jar:/opt/flink-1.3.1/lib/slf4j-log4j12-1.7.7.jar:/opt/flink-1.3.1/lib/flink-dist_2.11-1.3.1.jar:::/etc/hadoop/conf …Run Code Online (Sandbox Code Playgroud)