在 Flink 中以编程方式配置 S3 选项

syn*_*pse 4 hadoop amazon-s3 apache-flink flink-streaming

显然,当以编程方式设置 S3 选项时,Flink 1.14.0 无法正确转换它们。我正在创建一个像这样的本地环境来连接到本地 MinIO 实例:

  val flinkConf = new Configuration()
  flinkConf.setString("s3.endpoint", "http://127.0.0.1:9000")
  flinkConf.setString("s3.aws.credentials.provider", "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider")

  val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(flinkConf)
Run Code Online (Sandbox Code Playgroud)

然后StreamingFileSink失败,出现大量堆栈跟踪,其中包含最相关的消息,Caused by: org.apache.hadoop.fs.s3a.auth.NoAuthWithAWSException: No AWS Credentials provided by SimpleAWSCredentialsProvider EnvironmentVariableCredentialsProvider InstanceProfileCredentialsProvider : com.amazonaws.SdkClientException: Failed to connect to service endpoint: 这意味着 Hadoop 尝试枚举所有凭证提供程序,而不是使用配置中的一组凭证提供程序。我究竟做错了什么?

bzu*_*bzu 6

我也花了很长时间试图弄清楚这一点。我找不到以编程方式设置它的方法,但最终将以下内容添加到我的 Flink java 项目根目录中的 src/main/resources/core-site.xml 中:

<?xml version="1.0"?>
<configuration>
    <property>
        <name>fs.s3a.aws.credentials.provider</name>
        <value>com.amazonaws.auth.profile.ProfileCredentialsProvider</value>
    </property>
</configuration>
Run Code Online (Sandbox Code Playgroud)

然后我可以使用 AWS_PROFILE env var 来选择存储的凭证。这是针对带有 flink-s3-fs-hadoop 1.13.2 的 Flink