感谢您阅读这个问题,它可能看起来很长,但我会尝试在其中获取尽可能多的信息以帮助获得答案。
目前,我们的 Flink 集群遇到了调度问题。
症状是我们的部分/大部分/全部(这取决于情况,症状并不总是相同)任务显示为“已计划”,但在超时后失败。然后,作业将显示为RUNNING。
失败的异常如下:
Caused by: java.util.concurrent.CompletionException: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Slot request bulk is not fulfillable! Could not allocate the required slot within slot request timeout
Run Code Online (Sandbox Code Playgroud)
经过分析,我们假设(我们无法证明这一点,因为这部分代码没有太多日志)失败是由于同时提交多个作业时发生的死锁/竞争条件造成的。 Flink 集群,即使集群中有足够的可用槽。
实际上,我们有 52 个可用任务槽位的错误,并且有 12 个作业未安排。
PS:不久前,我在ML上问了或多或少相同的问题,但放弃了它,如果这被认为是交叉询问,我很抱歉,这不是有意的。我们只是打开一个新线程,因为我们有更多信息并且问题再次出现。
我们目前面临 Flink 中文件系统抽象的问题。我们有一个可以动态连接到 S3 源的作业(意味着它是在运行时定义的)。我们在代码中发现了一个错误,这可能是由于对文件系统工作方式的错误假设造成的。
在作业初始化期间(因此在作业管理器中),我们操作 FS 来检查某些文件是否存在,以便在执行作业之前优雅地失败。在我们的例子中,我们需要动态设置 FS。它可以是 HDFS、AWS 上的 S3 或 MinIO 上的 S3。我们希望 FS 配置特定于作业,并且与集群配置不同(不同的访问密钥、不同的端点等)。
以下是我们用于执行此操作的代码摘录:
private void validateFileSystemAccess(Configuration configuration) throws IOException {
// Create a plugin manager from the configuration
PluginManager pluginManager = PluginUtils.createPluginManagerFromRootFolder(configuration);
// Init the FileSystem from the configuration
FileSystem.initialize(configuration, pluginManager);
// Validate the FileSystem: an exception is thrown if FS configuration is wrong
Path archiverPath = new Path(this.archiverPath);
archiverPath.getFileSystem().exists(new Path("/"));
}
Run Code Online (Sandbox Code Playgroud)
开始特定类型的工作后,我们注意到:
如果我们不部署这种作业,工件的上传和检查点将按预期在集群上工作。
我们认为这个问题可能来自于FileSystem.initialize()覆盖所有文件系统的配置。我们认为正因为如此,下一次调用将FileSystem.get()返回我们配置的文件系统,validateFileSystemAccess …