使用状态处理器 api 的 Flink 状态后端配置

Fra*_*ray 5 apache-flink

我使用 state-processor-api ,因为它被发布来引导我的 flink 状态。我使用 RocksDBStateBackend 并且它有效。我们最近使用了 flink 1.13,RocksDBStateBackend 已被弃用,取而代之的是 EmbeddedRocksDBStateBackend。

我的问题:

自从 API 发生变化以及我开发了新的引导程序作业以来,我遇到了以下异常:

引起原因:java.io.IOException:状态大小大于允许的最大内存支持状态。大小=85356498,最大大小=5242880。考虑使用不同的状态后端,例如文件系统状态后端。

在这里我声明我的statebackend:

val backend = new EmbeddedRocksDBStateBackend(true)
Run Code Online (Sandbox Code Playgroud)

在这里我创建我的保存点:

  Savepoint
    .create(backend, MAX_PARALLELISM)
    .withOperator("my_operator", transformMyOperator)
    .write(savepointPath)
Run Code Online (Sandbox Code Playgroud)

此外,我的 flink 集群配置为使用 RocksDB 状态后端,所有其他 flink 拓扑都使用 RocksDB 后端。

所以我想知道为什么我收到一个异常,说我不应该使用内存状态后端,因为我使用 RocksDB。欢迎任何帮助。

nom*_*moa 3

这是由 1.13 中的错误引起的,请参阅FLINK-23728,运行 1.14.0-RC0 确实为我解决了这个问题。