Gil*_*ove 5 apache-flink flink-streaming
我们目前面临 Flink 中文件系统抽象的问题。我们有一个可以动态连接到 S3 源的作业(意味着它是在运行时定义的)。我们在代码中发现了一个错误,这可能是由于对文件系统工作方式的错误假设造成的。
在作业初始化期间(因此在作业管理器中),我们操作 FS 来检查某些文件是否存在,以便在执行作业之前优雅地失败。在我们的例子中,我们需要动态设置 FS。它可以是 HDFS、AWS 上的 S3 或 MinIO 上的 S3。我们希望 FS 配置特定于作业,并且与集群配置不同(不同的访问密钥、不同的端点等)。
以下是我们用于执行此操作的代码摘录:
private void validateFileSystemAccess(Configuration configuration) throws IOException {
// Create a plugin manager from the configuration
PluginManager pluginManager = PluginUtils.createPluginManagerFromRootFolder(configuration);
// Init the FileSystem from the configuration
FileSystem.initialize(configuration, pluginManager);
// Validate the FileSystem: an exception is thrown if FS configuration is wrong
Path archiverPath = new Path(this.archiverPath);
archiverPath.getFileSystem().exists(new Path("/"));
}
Run Code Online (Sandbox Code Playgroud)
开始特定类型的工作后,我们注意到:
如果我们不部署这种作业,工件的上传和检查点将按预期在集群上工作。
我们认为这个问题可能来自于FileSystem.initialize()覆盖所有文件系统的配置。我们认为正因为如此,下一次调用将FileSystem.get()返回我们配置的文件系统,validateFileSystemAccess而不是集群配置的文件系统。
我们的假设是正确的吗?如果是这样,我们如何在不影响整个集群的情况下为文件系统提供特定的配置?
| 归档时间: |
|
| 查看次数: |
211 次 |
| 最近记录: |