压缩批处理9结构化流错误时_spark_metadata / 0不存在

Azi*_*gda 5 scala apache-kafka apache-spark spark-structured-streaming

我们有使用Spark结构化流实现的流应用程序。它将尝试从kafka主题中读取数据并将其写入HDFS位置。

有时应用程序无法给出错误:

_spark_metadata/0 doesn't exist while compacting batch 9

java.lang.IllegalStateException: history/1523305060336/_spark_metadata/9.compact doesn't exist when compacting batch 19 (compactInterval: 10)
Run Code Online (Sandbox Code Playgroud)

无法解决此问题。

我发现的唯一解决方案是删除检查点位置文件,如果再次运行该应用程序,它将从头开始读取主题/数据,这对于生产应用程序而言不是可行的解决方案。

谁能告诉这个错误的解决方案,所以我不必删除检查点,并且我可以从上次运行失败的地方继续。

删除检查点位置,它将从头开始启动应用程序并读取所有先前的数据。

应用程序示例代码:

spark.
readStream.
format("kafka")
.option("kafka.bootstrap.servers", <server list>)
.option("subscribe", <topic>)
.load()

 spark.
 writeStream.
 format("csv").
 option("format", "append").
 option("path",hdfsPath).
 option("checkpointlocation","")
 .outputmode(append).start
Run Code Online (Sandbox Code Playgroud)

需要解决方案而不删除检查指示位置

mik*_*ike 11

错误信息

_spark_metadata/n.compact doesn't exist when compacting batch n+10
Run Code Online (Sandbox Code Playgroud)

当你可以出现时

  • 将一些数据处理到启用检查点的 FileSink 中,然后
  • 停止你的流媒体工作,然后
  • 更改 FileSink 的输出目录,同时保持相同的 checkpointLocation,然后
  • 重新启动流作业

快速解决方案(不适用于生产)

只需删除 checkpointLocation 中的文件并重新启动应用程序即可。

稳定的解决方案

由于您不想删除检查点文件,因此只需将丢失的 Spark 元数据文件从文件接收器输出路径复制到输出路径即可。请参阅下文以了解什么是“丢失的 Spark 元数据文件”。

背景

要理解为什么IllegalStateException会抛出这个问题,我们需要了解所提供的文件输出路径中幕后发生的情况。让outPathBefore为该路径的名称。当您的流作业正在运行并处理数据时,该作业会创建一个文件夹outPathBefore/_spark_metadata。在该文件夹中,您将找到一个以微批量标识符命名的文件,其中包含数据已写入的文件(分区文件)列表,例如:

/home/mike/outPathBefore/_spark_metadata$ ls
0 1 2 3 4 5 6 7
Run Code Online (Sandbox Code Playgroud)

在本例中,我们有 8 个微批次的详细信息。其中一个文件的内容如下所示

/home/mike/outPathBefore/_spark_metadata$ cat 0
v1
{"path":"file:///tmp/file/before/part-00000-99bdc705-70a2-410f-92ff-7ca9c369c58b-c000.csv","size":2287,"isDir":false,"modificationTime":1616075186000,"blockReplication":1,"blockSize":33554432,"action":"add"}
Run Code Online (Sandbox Code Playgroud)

默认情况下,每第十个微批次,这些文件都会被压缩,这意味着文件 0、1、2、...、9 的内容将存储在名为 的压缩9.compact文件中。

此过程对后续十个批次持续进行,即在微批次 19 中,作业聚合最后 10 个文件,即 9.compact、10、11、12、...、19。

现在,假设您的流作业一直运行到微批次 15,这意味着该作业已创建以下文件:

/home/mike/outPathBefore/_spark_metadata/0
/home/mike/outPathBefore/_spark_metadata/1
...
/home/mike/outPathBefore/_spark_metadata/8
/home/mike/outPathBefore/_spark_metadata/9.compact
/home/mike/outPathBefore/_spark_metadata/10
...
/home/mike/outPathBefore/_spark_metadata/15
Run Code Online (Sandbox Code Playgroud)

在第十五个微批次之后,您停止了流作业并将文件接收器的输出路径更改为,例如,outPathAfter. 当您保持相同的检查点位置时,流作业将以微批次 16 继续。但是,它现在在新的输出路径中创建元数据文件:

/home/mike/outPathAfter/_spark_metadata/16
/home/mike/outPathAfter/_spark_metadata/17
...
Run Code Online (Sandbox Code Playgroud)

现在,这就是抛出异常的地方:当到达微批次 19 时,作业尝试压缩 Spark 元数据文件夹中的第十个最新文件。但是,它只能找到文件 16、17、18,但找不到 9.compact、10 等。因此错误消息显示:

java.lang.IllegalStateException: history/1523305060336/_spark_metadata/9.compact doesn't exist when compacting batch 19 (compactInterval: 10)
Run Code Online (Sandbox Code Playgroud)

文档

结构化流编程指南解释了流查询更改后的恢复语义

“不允许更改文件接收器的输出目录: sdf.writeStream.format("parquet").option("path", "/somePath") 到 sdf.writeStream.format("parquet").option("路径", "/otherPath")"

Databricks 还在文章Streaming with File Sink: Problems with recovery if youchange checkpoint or output paths中写了一些详细信息


Man*_* Do 3

checkpointLocation由于checkpointLocation存储旧的或删除的数据信息而导致的错误。您只需删除包含checkpointLocation.

探索更多: https: //kb.databricks.com/streaming/file-sink-streaming.html

例子 :

df.writeStream
      .format("parquet")
      .outputMode("append")
      .option("checkpointLocation", "D:/path/dir/checkpointLocation")
      .option("path", "D:/path/dir/output")
      .trigger(Trigger.ProcessingTime("5 seconds"))
      .start()
      .awaitTermination()
Run Code Online (Sandbox Code Playgroud)

您需要删除目录checkpointLocation