结构化流 - 无法使用FileContext API管理AWS S3上的元数据日志文件

him*_*ian 1 scala amazon-s3 apache-spark spark-structured-streaming

我在Spark中有一个StreamingQuery(v2.2.0),即

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "test")
  .load()

val query = df
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .writeStream
  .format("parquet")
  .option("checkpointLocation", "s3n://bucket/checkpoint/test")
  .option("path", "s3n://bucket/test")
  .start()
Run Code Online (Sandbox Code Playgroud)

当我运行时,query数据确实在AWS S3上保存并且创建了检查点s3n://bucket/checkpoint/test.但是,我也在日志中收到以下警告:

WARN [oassestreaming.OffsetSeqLog]无法使用FileContext API在路径s3n:// bucket/checpoint/test/offsets管理元数据日志文件.使用FileSystem API代替管理日志文件.失败时日志可能不一致.

我无法理解为什么会出现这种警告.此外,如果发生任何故障,我的检查点会不一致吗?

任何人都可以帮我解决它吗?

Sha*_*ica 5

查看源代码,此错误来自HDFSMetadataLog类.代码中的注释指出:

注意:[[HDFSMetadataLog]]不支持类似S3的文件系统,因为它们不保证目录中的列表文件始终显示最新文件.

所以问题是由于使用AWS S3,它将迫使您使用FileSystemManagerAPI.检查该课程的评论,我们看到,

使用较旧的FileSystem API实现FileManager.请注意,此实现无法提供路径的原子重命名,因此可能导致一致性问题.当不能使用FileContextManager时,这应仅用作备份选项.

因此,当多个编写者想要同时进行重命名操作时,可能会出现一些问题.有一个相关的票在这里,但是,它已经关闭,因为这个问题不能固定在星火.

如果需要在S3上检查点,需要考虑的一些事项:

  1. 为了避免警告和潜在的麻烦,检查点到HDFS,然后复制结果
  2. 检查点到S3,但检查点之间有很长的差距.