创建作业时使用另一个文件系统配置

Gil*_*ove 5 apache-flink flink-streaming

概括

我们目前面临 Flink 中文件系统抽象的问题。我们有一个可以动态连接到 S3 源的作业(意味着它是在运行时定义的)。我们在代码中发现了一个错误,这可能是由于对文件系统工作方式的错误假设造成的。

错误解释

在作业初始化期间(因此在作业管理器中),我们操作 FS 来检查某些文件是否存在,以便在执行作业之前优雅地失败。在我们的例子中,我们需要动态设置 FS。它可以是 HDFS、AWS 上的 S3 或 MinIO 上的 S3。我们希望 FS 配置特定于作业,并且与集群配置不同(不同的访问密钥、不同的端点等)。

以下是我们用于执行此操作的代码摘录:

  private void validateFileSystemAccess(Configuration configuration) throws IOException {
    // Create a plugin manager from the configuration
    PluginManager pluginManager = PluginUtils.createPluginManagerFromRootFolder(configuration);

    // Init the FileSystem from the configuration
    FileSystem.initialize(configuration, pluginManager);

    // Validate the FileSystem: an exception is thrown if FS configuration is wrong
    Path archiverPath = new Path(this.archiverPath);
    archiverPath.getFileSystem().exists(new Path("/"));
  }
Run Code Online (Sandbox Code Playgroud)

开始特定类型的工作后,我们注意到:

  1. 检查点不适用于此作业,它会引发凭据错误。
  2. 作业管理器无法上传历史服务器所需的所有已运行的所有作业(不仅是这种特定类型的作业)所需的工件。

如果我们不部署这种作业,工件的上传和检查点将按预期在集群上工作。

我们认为这个问题可能来自于FileSystem.initialize()覆盖所有文件系统的配置。我们认为正因为如此,下一次调用将FileSystem.get()返回我们配置的文件系统,validateFileSystemAccess而不是集群配置的文件系统。

问题

我们的假设是正确的吗?如果是这样,我们如何在不影响整个集群的情况下为文件系统提供特定的配置?