从 IDE 运行 flink 时如何设置 presto.s3.xxx 属性?

Chr*_*now 1 apache-flink

我能够成功运行我的 flink 作业,该作业使用./bin/flink run ....

为此,我必须将 flink-s3-fs-presto jar 复制到我的$FLINK_HOME/lib文件夹中,并且我还必须在我的以下文件中配置我的 S3 连接详细信息flink-conf.yaml

你需要在 Flink 的 flink-conf.yaml 中同时配置 s3.access-key 和 s3.secret-key :

s3.access-key: your-access-key
s3.secret-key: your-secret-key
Run Code Online (Sandbox Code Playgroud)

来源:flink aws 文档

我还必须设置一个属性,s3.endpoint因为我使用的是来自 IBM Cloud 的 S3。

当我使用./bin/flink run.

但是,当我尝试从 IDE (IntelliJ) 运行我的作业时,出现以下错误:

org.apache.flink.runtime.client.JobExecutionException:无法初始化任务“DataSink(TextOutputFormat(s3://xxxx/folder)-UTF-8)”:无法从服务端点加载凭据

我在 IDE 运行作业中设置了一个环境变量,FLINK_CONF_DIR指向我的 flink-conf.yaml,我可以看到我的配置属性被选中:

11:04:39,487 INFO  org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: s3.access-key, ****
11:04:39,487 INFO  org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: s3.secret-key, ****
11:04:39,487 INFO  org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: s3.endpoint, s3-api.us-geo.objectstorage.softlayer.net
Run Code Online (Sandbox Code Playgroud)

但是,我收到一个错误,表明当我从 IDE 运行时,这些属性没有传递到 presto 库:

Caused by: org.apache.flink.fs.s3presto.shaded.com.amazonaws.SdkClientException: Unable to load credentials from service endpoint
Run Code Online (Sandbox Code Playgroud)

另外,为了验证这个理论,如果我在从 IDE 运行时逐步执行代码,我可以看到我的端点属性没有被应用:

在此处输入图片说明

...并深入了解Hadoop配置,我可以看到flink配置是一个空映射:

在此处输入图片说明

再深入一点,我可以看到org.apache.flink.core.fs.FileSystem#getUnguardedFileSystem()正在创建一个新的空配置:

        // this "default" initialization makes sure that the FileSystem class works
        // even when not configured with an explicit Flink configuration, like on
        // JobManager or TaskManager setup
        if (FS_FACTORIES.isEmpty()) {
            initialize(new Configuration());
        }
Run Code Online (Sandbox Code Playgroud)

从 IDE 运行时如何配置s3.access-key,s3.secret-keys3.endpoint属性?

小智 5

只需致电

FileSystem.initialize(GlobalConfiguration.loadConfiguration(System.getenv("FLINK_CONF_DIR")));
Run Code Online (Sandbox Code Playgroud)

env.execute()
Run Code Online (Sandbox Code Playgroud)

将解决问题。

请记住,您仍然需要将您的密钥和访问密钥放在 flink-conf.yaml 中。