Spark 数据帧检查点清理

awe*_*eis 7 hive scala apache-spark

我在 spark 中有一个数据框,其中已加载来自 Hive 的整个分区,我需要在对数据进行一些修改后打破沿袭以覆盖同一分区。但是,当 Spark 作业完成后,我只剩下 HDFS 上检查点的数据。为什么 Spark 不能自行清理它,或者我遗漏了什么?

spark.sparkContext.setCheckpointDir("/home/user/checkpoint/")
spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")

val df = spark.table("db.my_table").filter(col("partition").equal(2))

// ... transformations to the dataframe

val checkpointDf = df.checkpoint()
checkpointDf.write.format("parquet").mode(SaveMode.Overwrite).insertInto("db.my_table")
Run Code Online (Sandbox Code Playgroud)

在此之后,我在 HDFS 上有了这个文件:

/home/user/checkpoint/214797f2-ce2e-4962-973d-8f215e5d5dd8/rdd-23/part-00000
Run Code Online (Sandbox Code Playgroud)

每次我运行 spark 作业时,我都会得到一个新目录,其中包含一个新的唯一 ID,其中包含数据帧中每个 RDD 的文件。

gge*_*eop 8

Spark 具有用于检查点文件清理的隐式机制。

在 spark-defaults.conf 中添加此属性。

spark.cleaner.referenceTracking.cleanCheckpoints  true #Default is false
Run Code Online (Sandbox Code Playgroud)

您可以在Spark 官方配置页面中找到更多关于 Spark 配置的信息

如果你想从 HDFS 中删除检查点目录,你可以用 Python 删除它,在你的脚本末尾你可以使用这个命令rmtree

此属性spark.cleaner.referenceTracking.cleanCheckpointstrue,允许清除检查点目录中的旧检查点文件。