尝试从检查点重新启动时作业卡住

Col*_*etz 6 apache-flink flink-streaming

语境

我们使用 Flink 运行许多从 Kafka 读取数据的流作业,执行一些 SQL 转换并将输出写入 Kafka。它在 Kubernetes 上运行,有两个作业管理器和许多任务管理器。我们的作业使用 RocksDB 的检查点,并且我们的检查点写入 AWS S3 中的存储桶上。

最近,我们从 Flink 1.13.1 升级到 Flink 1.15.2。我们使用保存点机制来停止我们的作业并在新版本上重新启动它们。我们有两个 Kubernetes 集群。搬家后,他们俩似乎一切都很好。但几天后(第一个集群几乎一个月,第二个集群需要 2 或 3 天),我们现在遇到了其他问题(这可能与迁移到 Flink 1.15 有关,也可能与后来发生的迁移无关)。

问题描述

最近,我们注意到有一些作业无法启动。我们看到执行图中的“源”任务保持已创建状态,而图中的所有其他任务(ChangelogNormalize、Writer)都在运行。作业定期重新启动并出现错误(为了便于阅读而简化了堆栈跟踪):

java.lang.Exception: Cannot deploy task Source: source_consumer -> *anonymous_datastream_source$81*[211] (1/8) (de8f109e944dfa92d35cdc3f79f41e6f) - TaskManager (<address>) not responding after a rpcTimeout of 10000 ms
    at org.apache.flink.runtime.executiongraph.Execution.lambda$deploy$5(Execution.java:602)
    ...
Caused by: java.util.concurrent.TimeoutException: Invocation of [RemoteRpcInvocation(TaskExecutorGateway.submitTask(TaskDeploymentDescriptor, JobMasterId, Time))] at recipient [akka.tcp://flink@<address>/user/rpc/taskmanager_0] timed out. This is usually caused by: 1) Akka failed sending the message silently, due to problems like oversized payload or serialization failures. In that case, you should find detailed error information in the logs. 2) The recipient needs more time for responding, due to problems like slow machines or network jitters. In that case, you can try to increase akka.ask.timeout.
    at org.apache.flink.runtime.jobmaster.RpcTaskManagerGateway.submitTask(RpcTaskManagerGateway.java:60)
    at org.apache.flink.runtime.executiongraph.Execution.lambda$deploy$4(Execution.java:580)
    at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
    at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.base/java.lang.Thread.run(Unknown Source)
Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka.tcp://flink@<address>/user/rpc/taskmanager_0#1723317240]] after [10000 ms]. Message of type [org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation]. A typical reason for `AskTimeoutException` is that the recipient actor didn't send a reply.
Run Code Online (Sandbox Code Playgroud)

我们还在工作经理中注意到了这条消息:

Discarding oversized payload sent to Actor[akka.tcp://flink@<address>/user/rpc/taskmanager_0#1153219611]: max allowed size 10485760b bytes, actual size of encoded class org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation was 83938405 bytes.
Run Code Online (Sandbox Code Playgroud)

目前尚不清楚为什么会发送这么大的 Akka 消息。但当设置akka.framesize为更高的值(100MB)时,超时确实消失了。而之前卡住的任务CREATED就是现在INITIALIZING

然而,这份工作会持续INITIALIZING很长时间。有时它们会启动,有时会失败并出现错误:

java.lang.OutOfMemoryError: Java heap space
Run Code Online (Sandbox Code Playgroud)

增加任务管理器的内存对某些工作有帮助,但不是全部。总的来说,它们似乎需要更多的内存并且需要很长的时间来初始化。有时我们会从 S3 重置连接:

Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when trying to restore operator state backend
...
Caused by: java.lang.IllegalStateException: Connection pool shut down
Run Code Online (Sandbox Code Playgroud)

新观察结果 (08/02/2023):我们发现有问题的作业_metadata在其检查点中有一个非常大的文件(最大为 168MB)。更糟糕的是,每次作业从其检查点恢复时,它的大小似乎都会增加一倍(当重新启动后执行第一个检查点时,随后的检查点保持不变)。

问题

  • 提交任务时,什么可能导致 Akka 消息那么大?
  • Flink 1.13 和 Flink 1.15 之间是否发生了一些变化可以解释这些问题?
  • 我们如何确定是什么占用了所有堆内存?

Col*_*etz 7

以为我们不明白一切,我们找到了问题的根源并设法解决它。

TL;DR:当我们从 FlinkKafkaConsumer 切换到 KafkaSource 时,密钥topic-partition-offset-states保留在作业状态(检查点)中。虽然它不再使用,但它呈指数级增长,因此我们将其从检查点中删除(使用自定义 Java 代码)并重新启动一切。

详细信息:

  • 我们从 FlinkKafkaConsumer 切换到 KafkaSource。我们确保setStartingOffsets(OffsetsInitializer.committedOffsets())从保存点恢复作业时提交并使用了它们的偏移量(如https://nightlies.apache.org/flink/flink-docs-release-1.14/release-notes/flink-1.14/#中所述)弃用-flinkkafkaconsumer)。这确实起作用了,我们的工作在 Flink 1.15 中正确恢复,偏移量正确,状态看起来不错。
  • 然而,源操作员似乎将密钥保留topic-partition-offset-states在其状态中。FlinkKakfaConsumer 使用了它,但 KafkaSource 没有使用它。
  • 由于某种原因(我们无法确定),topic-partition-offset-states当我们的作业恢复时,有时偏移量的长度会加倍(我们在 Kubernetes 上使用 HA,因此如果我们重新启动 Flink,这种情况可能会定期发生)。
  • 一段时间后,这个偏移量列表变得如此之大,以至于_metadata检查点中的文件变得非常大(168MB)。这导致 Akka 超时,因为它们超出了akka.framesize. 增加帧大小有帮助,但增加了内存压力,导致许多堆内存错误。_metadata此外,随着大小不断翻倍超过 10MB,这只会让问题变得更糟。
  • completedCheckpoint高可用性存储目录中的文件也存在同样的问题。
  • 为了解决这个问题,我们必须:
    • 反序列化CompletedCheckpoint
    • 更新它们,以从状态中删除topic-partition-offset-states密钥(使这些文件小得多)。
    • 重新序列化它们并替换原始文件。
  • 重新启动任务管理器和作业管理器后,作业已正确加载。在他们编写第一个检查点后,_metadata文件恢复到合理的大小。