Apache Flink to use S3 for backend state and checkpoints

Kes*_*dhi 1 amazon-s3 checkpoint apache-flink checkpointing flink-streaming

Background

  • I was planning to use S3 to store the Flink's checkpoints using the FsStateBackend. But somehow I was getting the following error.

Error

org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 's3'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded.
Run Code Online (Sandbox Code Playgroud)

Flink version: I am using Flink 1.10.0 version.

Kes*_*dhi 7

I have found the solution for the above issue, so here I am listing it in steps that are required.

Steps

  1. We need to add some configs in the flink-conf.yaml file which I have listed below.
state.backend: filesystem
state.checkpoints.dir: s3://s3-bucket/checkpoints/ #"s3://<your-bucket>/<endpoint>"
state.backend.fs.checkpointdir: s3://s3-bucket/checkpoints/ #"s3://<your-bucket>/<endpoint>"


s3.access-key: XXXXXXXXXXXXXXXXXXX #your-access-key
s3.secret-key: xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx #your-secret-key

s3.endpoint: http://127.0.0.1:9000 #your-endpoint-hostname (I have used Minio) 
Run Code Online (Sandbox Code Playgroud)
  1. After completing the first step we need to copy the respective(flink-s3-fs-hadoop-1.10.0.jar and flink-s3-fs-presto-1.10.0.jar) JAR files from the opt directory to the plugins directory of your Flink.

    • E.g:--> 1. Copy /flink-1.10.0/opt/flink-s3-fs-hadoop-1.10.0.jar to /flink-1.10.0/plugins/s3-fs-hadoop/flink-s3-fs-hadoop-1.10.0.jar // Recommended for StreamingFileSink
      2. Copy /flink-1.10.0/opt/flink-s3-fs-presto-1.10.0.jar to /flink-1.10.0/plugins/s3-fs-presto/flink-s3-fs-presto-1.10.0.jar //Recommended for checkpointing
  2. Add this in checkpointing code

env.setStateBackend(new FsStateBackend("s3://s3-bucket/checkpoints/"))
Run Code Online (Sandbox Code Playgroud)
  1. After completing all the above steps re-start the Flink if it is already running.

Note:

  • If you are using both(flink-s3-fs-hadoop and flink-s3-fs-presto) in Flink then please use s3p:// specificly for flink-s3-fs-presto and s3a:// for flink-s3-fs-hadoop instead of s3://.
  • For more details click here.

  • 另一件事:建议使用“flink-s3-fs-presto”进行检查点,而不是“flink-s3-fs-hadoop”。hadoop S3 试图在 S3 之上模仿真实的文件系统,因此,它在创建文件时具有很高的延迟,并且很快就会达到请求速率限制。这是因为在写入密钥之前,它会检查“父目录”是否存在,这可能涉及一堆昂贵的 S3 HEAD 请求(其请求速率限制非常低)。 (3认同)
  • 此外,使用 Hadoop S3,您可能会遇到恢复操作失败的情况,因为状态文件看起来不存在(HEAD 请求导致 S3 负载均衡器中的错误缓存)。只有一段时间后该文件才会可见,恢复才会成功。 (2认同)