RDD.checkpoint()不在检查点目录中存储任何数据

Kri*_*ian 5 apache-spark pyspark

我用sc.setCheckpointDir方法设置了checkpoint目录.

/checkpointDirectory/
Run Code Online (Sandbox Code Playgroud)

然后我创建了一个rdd的检查点:rdd.checkpoint()在目录中,我现在看到一个代表新检查点的新目录,以随机字符串的形式出现.在那个目录里面什么都没有.

/checkpointDirectory/37d2812a-bca2-4fc5-b5d4-221ae03a6d25/  [empty]
Run Code Online (Sandbox Code Playgroud)

然后在做了几次转换之后,我rdd.checkpoint()再次运行,并且在最近创建的目录中仍然没有任何内容

/checkpointDirectory/37d2812a-bca2-4fc5-b5d4-221ae03a6d25/  [empty]
Run Code Online (Sandbox Code Playgroud)

我用checkpoint()错了吗?我应该在该目录中看到什么才能知道它的正常工作?

zer*_*323 5

checkpoint和Spark一样多的其他操作都是懒惰的.当且仅当给定的RDD具体化时,才对数据进行实际检查.您看到的空目录是特定于应用程序的检查点目录.

如果要进行检查点,则必须触发将评估相应RDD的操作.通过示例(本地模式):

import glob
import os
from urllib.parse import urlparse

sc.setCheckpointDir("/tmp/checkpoints/")
ch_dir = os.path.join(urlparse(sc._jsc.getCheckpointDir().orElse("")).path, "*")

rdd = sc.range(1000, 10)
plus_one = rdd.map(lambda x: x + 1)
plus_one.cache()
plus_one.checkpoint()  # No checkpoint dir here yet

[os.path.split(x)[-1] for x in glob.glob(ch_dir)]
## []
plus_one.isCheckpointed()
## False

# After count is executed you'll see rdd specific checkpoint dir
plus_one.count()  
[os.path.split(x)[-1] for x in glob.glob(ch_dir)]
## ['rdd-1']
plus_one.isCheckpointed()
## True
Run Code Online (Sandbox Code Playgroud)

您还可以在之前分析调试字符串:

## (8) PythonRDD[1] at RDD at PythonRDD.scala:48 [Memory Serialized 1x Replicated]
##  |  ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:475 [Memory Serialized 1x Replicated]
Run Code Online (Sandbox Code Playgroud)

并在采取行动后:

## (8) PythonRDD[1] at RDD at PythonRDD.scala:48 [Memory Serialized 1x Replicated]
##  |       CachedPartitions: 8; MemorySize: 168.0 B; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B
##  |  ReliableCheckpointRDD[3] at count at <ipython-input-16-96e746c56973>:1 [Memory Serialized 1x Replicated]
Run Code Online (Sandbox Code Playgroud)

正如你所看到的,RDD将从头开始计算,但在count你得到之后ReliableCheckpointRDD.