Amazon EMR 在为 Apache-Flink 提交作业时出现错误,Hadoop 可恢复

Sud*_*nka 2 amazon-s3 hadoop2 apache-flink

Added Depedency Pom Details :

<dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>1.7.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>1.7.1</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-core</artifactId>
            <version>1.7.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.7.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-runtime_2.11</artifactId>
            <version>1.7.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table_2.11</artifactId>
            <version>1.7.1</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.10_2.11</artifactId>
            <version>1.7.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-filesystem_2.11</artifactId>
            <version>1.7.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-hadoop-compatibility_2.11</artifactId>
            <version>1.7.1</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-s3-fs-hadoop</artifactId>
            <version>1.7.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-shaded-hadoop</artifactId>
            <version>1.7.1</version>
            <type>pom</type>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-aws</artifactId>
            <version>2.8.5</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>2.8.5</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.8.5</version>
        </dependency>
        <dependency>
            <groupId>com.amazonaws</groupId>
            <artifactId>aws-java-sdk-s3</artifactId>
            <version>1.11.529</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-connectors</artifactId>
            <version>1.1.5</version>
            <type>pom</type>
        </dependency>
    </dependencies>
Run Code Online (Sandbox Code Playgroud)

java.lang.UnsupportedOperationException:Hadoop 上的可恢复写入器仅支持 HDFS 和 Hadoop 2.7 版或更高版本的 org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.(HadoopRecoverableWriter.java:57) at org.apache.flink .runtime.fs.hdfs.HadoopFileSystem.createRecoverableWriter(HadoopFileSystem.java:202) 在 org.apache.flink.core.fs.SafetyNetWrapperFileSystem.createRecoverableWriter(SafetyNetWrapperFileSystem.java:69) 在 org.apache.flink.streaming.api.functions .sink.filesystem.Buckets.(Buckets.java:112) 在 org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:242) 在 org.apache.flink。 stream.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:327) 在 org.apache.flink。stream.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160) at org.apache.flink.streaming.api。在 org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278) 在 org.apache.flink.streaming.runtime.tasks.StreamTask。在 org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289) 在 org.apache.flink.runtime.taskmanager.Task.run(Task.java: [704] 第704话tryRestoreFunction(StreamingFunctionUtils.java:178) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.UinitializeState(AbstractUdfStreamOperator.UinitializeState(AbstractUdfStreamOperator.UinitializeState) java:96) 在 org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278) 在 org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)在 org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289) 在 org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) 在 java.lang.Thread .run(Thread.java:748)tryRestoreFunction(StreamingFunctionUtils.java:178) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.UinitializeState(AbstractUdfStreamOperator.UinitializeState(AbstractUdfStreamOperator.UinitializeState) java:96) 在 org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278) 在 org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)在 org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289) 在 org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) 在 java.lang.Thread .run(Thread.java:748)restoreFunctionState(StreamingFunctionUtils.java:160) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator. java:278) 在 org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738) 在 org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)在 org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) 在 java.lang.Thread.run(Thread.java:748)restoreFunctionState(StreamingFunctionUtils.java:160) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator. java:278) 在 org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738) 在 org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)在 org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) 在 java.lang.Thread.run(Thread.java:748)278) 在 org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738) 在 org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289) 在 org .apache.flink.runtime.taskmanager.Task.run(Task.java:704) 在 java.lang.Thread.run(Thread.java:748)278) 在 org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738) 在 org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289) 在 org .apache.flink.runtime.taskmanager.Task.run(Task.java:704) 在 java.lang.Thread.run(Thread.java:748)

小智 7

Flink 使用称为ServiceLoader 的东西来加载与可插拔文件系统接口所需的组件。如果你想看看 Flink 在代码中是如何做到这一点的,请前往org.apache.flink.core.fs.FileSystem. 请注意initialize使用RAW_FACTORIES变量的函数。RAW_FACTORIES由函数创建loadFileSystems,您可以看到它使用了 Java 的ServiceLoader.

在 Flink 上启动应用程序之前,需要设置文件系统组件。这意味着您的 Flink 应用程序不需要捆绑这些组件,它们应该为您的应用程序提供。

EMR 不提供 Flink 需要使用 S3 作为开箱即用的流文件接收器的 S3 文件系统组件。抛出此异常不是因为版本不够高,而是因为 Flink 在没有与s3方案匹配的文件系统的情况下加载了 HadoopFileSystem (请参阅此处的代码)。

您可以通过为我的 Flink 应用程序启用 DEBUG 日志记录级别来查看您的文件系统是否正在加载,EMR 允许您在配置中执行此操作:

{
    "Classification": "flink-log4j",
    "Properties": {
      "log4j.rootLogger": "DEBUG,file"
    }
  },{
    "Classification": "flink-log4j-yarn-session",
    "Properties": {
      "log4j.rootLogger": "DEBUG,stdout"
    }
  }
Run Code Online (Sandbox Code Playgroud)

相关日志在 YARN 资源管理器中可用,查看单个节点的日志。搜索字符串"Added file system"应该可以帮助您找到所有成功加载的文件系统。

在这项调查中也很方便的是通过 SSH 连接到主节点并使用flink-scala REPL,在那里我可以看到 FileSystem Flink 决定加载给定文件 URI 的内容。

解决方案是/usr/lib/flink/lib/在启动 Flink 应用程序之前删除 S3 文件系统实现的 JAR 。这可以通过获取flink-s3-fs-hadoopor的引导操作来完成flink-s3-fs-presto(取决于您使用的实现)。我的引导操作脚本如下所示:

sudo mkdir -p /usr/lib/flink/lib
cd /usr/lib/flink/lib

sudo curl -O https://search.maven.org/remotecontent?filepath=org/apache/flink/flink-s3-fs-hadoop/1.8.1/flink-s3-fs-hadoop-1.8.1.jar
Run Code Online (Sandbox Code Playgroud)