在 spark 2.3.0 中的结构化流中禁用 _spark_metadata

Aji*_*nan 5 apache-spark parquet spark-streaming apache-spark-sql spark-structured-streaming

我的结构化流应用程序正在写入 parquet,我想摆脱它创建的 _spark_metadata 文件夹。我使用了下面的属性,看起来不错

--conf "spark.hadoop.parquet.enable.summary-metadata=false"

当应用程序启动时,不会_spark_metadata生成文件夹。但是一旦它移动到 RUNNING 状态并开始处理消息,它就会失败并显示以下错误,提示_spark_metadata文件夹不存在。似乎结构化流依赖于这个文件夹,没有它我们就无法运行。只是想知道在这种情况下禁用元数据属性是否有意义。这是流不是指 conf 的错误吗?

Caused by: java.io.FileNotFoundException: File /_spark_metadata does not exist.
        at org.apache.hadoop.fs.Hdfs.listStatus(Hdfs.java:261)
        at org.apache.hadoop.fs.FileContext$Util$1.next(FileContext.java:1765)
        at org.apache.hadoop.fs.FileContext$Util$1.next(FileContext.java:1761)
        at org.apache.hadoop.fs.FSLinkResolver.resolve(FSLinkResolver.java:90)
        at org.apache.hadoop.fs.FileContext$Util.listStatus(FileContext.java:1761)
        at org.apache.hadoop.fs.FileContext$Util.listStatus(FileContext.java:1726)
        at org.apache.hadoop.fs.FileContext$Util.listStatus(FileContext.java:1685)
        at org.apache.spark.sql.execution.streaming.HDFSMetadataLog$FileContextManager.list(HDFSMetadataLog.scala:370)
        at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.getLatest(HDFSMetadataLog.scala:231)
        at org.apache.spark.sql.execution.streaming.FileStreamSink.addBatch(FileStreamSink.scala:99)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3$$anonfun$apply$16.apply(MicroBatchExecution.scala:477)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3.apply(MicroBatchExecution.scala:475)
        at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
Run Code Online (Sandbox Code Playgroud)

Aji*_*nan 0

发生这种情况的原因是 kafkacheckpoint 文件夹未清理。kafka 检查点内的文件交叉引用 Spark 元数据文件并失败。一旦我删除了这两个文件,它就开始工作