Apache Flink:文件 STDOUT 在 TaskExecutor 上不可用

Dan*_*ich 2 logging scala docker apache-flink

我使用官方 flink 存储库中的以下 docker-compose.yml 启动了 flink。我只添加了到外部hadoop网络的连接。

version: "2.1"

networks:
  hadoop:
    external:
      name: flink_hadoop

services:
  jobmanager:
    image: flink:1.7.1-hadoop27-scala_2.11
    container_name: flink-jobmanager
    domainname: hadoop
    networks:
      - hadoop
    expose:
      - "6123"
    ports:
      - "8081:8081"
    command: jobmanager
    environment:
      - JOB_MANAGER_RPC_ADDRESS=jobmanager

  taskmanager:
    image: flink:1.7.1-hadoop27-scala_2.11
    container_name: flink-taskmanager
    domainname: hadoop
    networks:
      - hadoop
    expose:
      - "6121"
      - "6122"
    depends_on:
      - jobmanager
    command: taskmanager
    links:
      - "jobmanager:jobmanager"
    environment:
      - JOB_MANAGER_RPC_ADDRESS=jobmanager
Run Code Online (Sandbox Code Playgroud)

此后一切运行,我可以访问 WebUI。

然后我打包了以下工作。

import org.apache.flink.api.scala._
import org.slf4j.LoggerFactory
import stoff.schnaps.pojo.ActorMovie

object HdfsJob {
  private lazy val logger = LoggerFactory.getLogger(getClass)

  def main(args: Array[String]) {
    // set up the batch execution environment
    val env = ExecutionEnvironment.getExecutionEnvironment

    val actorMovies = env
      .readCsvFile[ActorMovie](
      "hdfs://namenode:8020/source-data/test.tsv",
      "\r\n",
      "\t",
      includedFields = Array(2,3,5),
      pojoFields = Array("actor",
                         "film",
                         "character"))

    actorMovies.print

    // execute program
    env.execute("Flink Batch Scala API Skeleton")
  }
}
Run Code Online (Sandbox Code Playgroud)

它只是将 hdfs 中的 tsv 文件读取到 pojos 的数据集中并打印出来。当我让它在本地运行时,一切正常。但是当我上传 .jar 并让它在集群上运行时,jobmanager 会记录以下异常:

java.util.concurrent.CompletionException: org.apache.flink.util.FlinkException: The file STDOUT is not available on the TaskExecutor.

java.util.concurrent.CompletionException: org.apache.flink.util.FlinkException: The file LOG is not available on the TaskExecutor.

显然任务管理器不包含任何日志,当前的问题是什么。

Til*_*ann 6

当在 Docker 上运行 Flink 时,该脚本将在前台docker-entrypoint.sh启动 Flink 进程(TaskExecutorJobMaster)。这会导致 Flink 既不会将其重定向到文件中,也不会登录到文件中。相反,Flink 也会记录到. 这样,您就可以通过查看 docker 容器的日志和 stdout 输出。STDOUTSTDOUTdocker logs

如果你想改变这种行为,改变docker-entrypoint.sh并通过start而不是start-foreground

if [ "${CMD}" == "${TASK_MANAGER}" ]; then
    $FLINK_HOME/bin/taskmanager.sh start "$@"
else
    $FLINK_HOME/bin/standalone-job.sh start "$@"
fi

sleep 1
exec /bin/bash -c "tail -f $FLINK_HOME/log/*.log"
Run Code Online (Sandbox Code Playgroud)

更新

当使用 Flink 的DataSetAPI 时,调用该方法DataSet::print实际上会将相应的数据DataSet从集群检索回客户端,并在客户端打印到 STDOUT。由于检索的原因,此方法仅在 Flink 的 CLI 客户端通过 提交作业时才有效bin/flink run <job.jar>。此行为与在程序执行位置打印DataStream::print的方法不同。DataStreamTaskManagers

如果您想DataSet在 上打印结果TaskManager,则需要调用DataSet::printOnTaskManager而不是print.