Kri*_*ian 5 apache-spark pyspark
我用sc.setCheckpointDir方法设置了checkpoint目录.
/checkpointDirectory/
Run Code Online (Sandbox Code Playgroud)
然后我创建了一个rdd的检查点:rdd.checkpoint()在目录中,我现在看到一个代表新检查点的新目录,以随机字符串的形式出现.在那个目录里面什么都没有.
/checkpointDirectory/37d2812a-bca2-4fc5-b5d4-221ae03a6d25/ [empty]
Run Code Online (Sandbox Code Playgroud)
然后在做了几次转换之后,我rdd.checkpoint()再次运行,并且在最近创建的目录中仍然没有任何内容
/checkpointDirectory/37d2812a-bca2-4fc5-b5d4-221ae03a6d25/ [empty]
Run Code Online (Sandbox Code Playgroud)
我用checkpoint()错了吗?我应该在该目录中看到什么才能知道它的正常工作?
checkpoint和Spark一样多的其他操作都是懒惰的.当且仅当给定的RDD具体化时,才对数据进行实际检查.您看到的空目录是特定于应用程序的检查点目录.
如果要进行检查点,则必须触发将评估相应RDD的操作.通过示例(本地模式):
import glob
import os
from urllib.parse import urlparse
sc.setCheckpointDir("/tmp/checkpoints/")
ch_dir = os.path.join(urlparse(sc._jsc.getCheckpointDir().orElse("")).path, "*")
rdd = sc.range(1000, 10)
plus_one = rdd.map(lambda x: x + 1)
plus_one.cache()
plus_one.checkpoint() # No checkpoint dir here yet
[os.path.split(x)[-1] for x in glob.glob(ch_dir)]
## []
plus_one.isCheckpointed()
## False
# After count is executed you'll see rdd specific checkpoint dir
plus_one.count()
[os.path.split(x)[-1] for x in glob.glob(ch_dir)]
## ['rdd-1']
plus_one.isCheckpointed()
## True
Run Code Online (Sandbox Code Playgroud)
您还可以在之前分析调试字符串:
## (8) PythonRDD[1] at RDD at PythonRDD.scala:48 [Memory Serialized 1x Replicated]
## | ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:475 [Memory Serialized 1x Replicated]
Run Code Online (Sandbox Code Playgroud)
并在采取行动后:
## (8) PythonRDD[1] at RDD at PythonRDD.scala:48 [Memory Serialized 1x Replicated]
## | CachedPartitions: 8; MemorySize: 168.0 B; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B
## | ReliableCheckpointRDD[3] at count at <ipython-input-16-96e746c56973>:1 [Memory Serialized 1x Replicated]
Run Code Online (Sandbox Code Playgroud)
正如你所看到的,RDD将从头开始计算,但在count你得到之后ReliableCheckpointRDD.
| 归档时间: |
|
| 查看次数: |
1095 次 |
| 最近记录: |