Sud*_*nka 2 amazon-s3 hadoop2 apache-flink
Added Depedency Pom Details :
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.7.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.7.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>1.7.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.7.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime_2.11</artifactId>
<version>1.7.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table_2.11</artifactId>
<version>1.7.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.10_2.11</artifactId>
<version>1.7.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-filesystem_2.11</artifactId>
<version>1.7.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-hadoop-compatibility_2.11</artifactId>
<version>1.7.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-s3-fs-hadoop</artifactId>
<version>1.7.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-hadoop</artifactId>
<version>1.7.1</version>
<type>pom</type>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-aws</artifactId>
<version>2.8.5</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.8.5</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.8.5</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-s3</artifactId>
<version>1.11.529</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-connectors</artifactId>
<version>1.1.5</version>
<type>pom</type>
</dependency>
</dependencies>
Run Code Online (Sandbox Code Playgroud)
java.lang.UnsupportedOperationException:Hadoop 上的可恢复写入器仅支持 HDFS 和 Hadoop 2.7 版或更高版本的 org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.(HadoopRecoverableWriter.java:57) at org.apache.flink .runtime.fs.hdfs.HadoopFileSystem.createRecoverableWriter(HadoopFileSystem.java:202) 在 org.apache.flink.core.fs.SafetyNetWrapperFileSystem.createRecoverableWriter(SafetyNetWrapperFileSystem.java:69) 在 org.apache.flink.streaming.api.functions .sink.filesystem.Buckets.(Buckets.java:112) 在 org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:242) 在 org.apache.flink。 stream.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:327) 在 org.apache.flink。stream.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160) at org.apache.flink.streaming.api。在 org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278) 在 org.apache.flink.streaming.runtime.tasks.StreamTask。在 org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289) 在 org.apache.flink.runtime.taskmanager.Task.run(Task.java: [704] 第704话tryRestoreFunction(StreamingFunctionUtils.java:178) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.UinitializeState(AbstractUdfStreamOperator.UinitializeState(AbstractUdfStreamOperator.UinitializeState) java:96) 在 org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278) 在 org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)在 org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289) 在 org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) 在 java.lang.Thread .run(Thread.java:748)tryRestoreFunction(StreamingFunctionUtils.java:178) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.UinitializeState(AbstractUdfStreamOperator.UinitializeState(AbstractUdfStreamOperator.UinitializeState) java:96) 在 org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278) 在 org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)在 org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289) 在 org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) 在 java.lang.Thread .run(Thread.java:748)restoreFunctionState(StreamingFunctionUtils.java:160) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator. java:278) 在 org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738) 在 org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)在 org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) 在 java.lang.Thread.run(Thread.java:748)restoreFunctionState(StreamingFunctionUtils.java:160) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator. java:278) 在 org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738) 在 org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)在 org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) 在 java.lang.Thread.run(Thread.java:748)278) 在 org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738) 在 org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289) 在 org .apache.flink.runtime.taskmanager.Task.run(Task.java:704) 在 java.lang.Thread.run(Thread.java:748)278) 在 org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738) 在 org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289) 在 org .apache.flink.runtime.taskmanager.Task.run(Task.java:704) 在 java.lang.Thread.run(Thread.java:748)
小智 7
Flink 使用称为ServiceLoader 的东西来加载与可插拔文件系统接口所需的组件。如果你想看看 Flink 在代码中是如何做到这一点的,请前往org.apache.flink.core.fs.FileSystem. 请注意initialize使用RAW_FACTORIES变量的函数。RAW_FACTORIES由函数创建loadFileSystems,您可以看到它使用了 Java 的ServiceLoader.
在 Flink 上启动应用程序之前,需要设置文件系统组件。这意味着您的 Flink 应用程序不需要捆绑这些组件,它们应该为您的应用程序提供。
EMR 不提供 Flink 需要使用 S3 作为开箱即用的流文件接收器的 S3 文件系统组件。抛出此异常不是因为版本不够高,而是因为 Flink 在没有与s3方案匹配的文件系统的情况下加载了 HadoopFileSystem (请参阅此处的代码)。
您可以通过为我的 Flink 应用程序启用 DEBUG 日志记录级别来查看您的文件系统是否正在加载,EMR 允许您在配置中执行此操作:
{
"Classification": "flink-log4j",
"Properties": {
"log4j.rootLogger": "DEBUG,file"
}
},{
"Classification": "flink-log4j-yarn-session",
"Properties": {
"log4j.rootLogger": "DEBUG,stdout"
}
}
Run Code Online (Sandbox Code Playgroud)
相关日志在 YARN 资源管理器中可用,查看单个节点的日志。搜索字符串"Added file system"应该可以帮助您找到所有成功加载的文件系统。
在这项调查中也很方便的是通过 SSH 连接到主节点并使用flink-scala REPL,在那里我可以看到 FileSystem Flink 决定加载给定文件 URI 的内容。
解决方案是/usr/lib/flink/lib/在启动 Flink 应用程序之前删除 S3 文件系统实现的 JAR 。这可以通过获取flink-s3-fs-hadoopor的引导操作来完成flink-s3-fs-presto(取决于您使用的实现)。我的引导操作脚本如下所示:
sudo mkdir -p /usr/lib/flink/lib
cd /usr/lib/flink/lib
sudo curl -O https://search.maven.org/remotecontent?filepath=org/apache/flink/flink-s3-fs-hadoop/1.8.1/flink-s3-fs-hadoop-1.8.1.jar
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1205 次 |
| 最近记录: |