PySpark:完全清理检查点

TMi*_*hel 5 apache-spark pyspark

根据文档,可以告诉 Spark 跟踪“超出范围”的检查点——那些不再需要的检查点——并从磁盘中清除它们。

SparkSession.builder
  ...
  .config("spark.cleaner.referenceTracking.cleanCheckpoints", "true")
  .getOrCreate()
Run Code Online (Sandbox Code Playgroud)

显然它这样做了,但问题是,最后一个检查点的 rdd 永远不会被删除。

  • 执行所有清理时是否缺少任何配置?
  • 如果没有:有没有办法获取为特定应用程序创建的临时文件夹的名称,以便我可以以编程方式删除它?即0c514fb8-498c-4455-b147-aff242bd7381SparkContext相同的方式获取applicationId

SMa*_*MaZ 9

我知道它的老问题,但最近我正在探索checkpoint并遇到类似的问题。想分享研究结果。

问题:执行所有清理时是否缺少任何配置?

设置spark.cleaner.referenceTracking.cleanCheckpoints=true有时会起作用,但很难依赖它。官方文档说通过设置这个属性

如果引用超出范围,则清理检查点文件

我不知道它到底是什么意思,因为我的理解是一旦 spark session/context 停止,它应该清理它。

但是,我找到了您的以下问题的答案

如果没有:有没有办法获取为特定应用程序创建的临时文件夹的名称,以便我可以以编程方式删除它?即从 SparkContext 获取 0c514fb8-498c-4455-b147-aff242bd7381 与获取 applicationId 的方式相同

是的,我们可以得到如下checkpointed目录:

斯卡拉:

//Set directory
scala> spark.sparkContext.setCheckpointDir("hdfs:///tmp/checkpoint/")

scala> spark.sparkContext.getCheckpointDir.get
res3: String = hdfs://<name-node:port>/tmp/checkpoint/625034b3-c6f1-4ab2-9524-e48dfde589c3

//It gives String so we can use org.apache.hadoop.fs to delete path 
Run Code Online (Sandbox Code Playgroud)

PySpark:

// Set directory
>>> spark.sparkContext.setCheckpointDir('hdfs:///tmp/checkpoint')
>>> t = sc._jsc.sc().getCheckpointDir().get()
>>> t 
u'hdfs://<name-node:port>/tmp/checkpoint/dc99b595-f8fa-4a08-a109-23643e2325ca'

// notice 'u' at the start which means It returns unicode object
// Below are the steps to get hadoop file system object and delete

>>> fs = sc._jvm.org.apache.hadoop.fs.FileSystem.get(sc._jsc.hadoopConfiguration())
fs.exists(sc._jvm.org.apache.hadoop.fs.Path(str(t)))
True

>>> fs.delete(sc._jvm.org.apache.hadoop.fs.Path(str(t)))
True
Run Code Online (Sandbox Code Playgroud)