zer*_*323 60
没有什么重要的区别,但基本的是与血统发生的事情.Persist/ cache保持血统完整,同时checkpoint打破血统.让我们考虑以下示例:
import org.apache.spark.storage.StorageLevel
val rdd = sc.parallelize(1 to 10).map(x => (x % 3, 1)).reduceByKey(_ + _)
Run Code Online (Sandbox Code Playgroud)
cache/ persist:
val indCache = rdd.mapValues(_ > 4)
indCache.persist(StorageLevel.DISK_ONLY)
indCache.toDebugString
// (8) MapPartitionsRDD[13] at mapValues at <console>:24 [Disk Serialized 1x Replicated]
// | ShuffledRDD[3] at reduceByKey at <console>:21 [Disk Serialized 1x Replicated]
// +-(8) MapPartitionsRDD[2] at map at <console>:21 [Disk Serialized 1x Replicated]
// | ParallelCollectionRDD[1] at parallelize at <console>:21 [Disk Serialized 1x Replicated]
indCache.count
// 3
indCache.toDebugString
// (8) MapPartitionsRDD[13] at mapValues at <console>:24 [Disk Serialized 1x Replicated]
// | CachedPartitions: 8; MemorySize: 0.0 B; ExternalBlockStoreSize: 0.0 B; DiskSize: 587.0 B
// | ShuffledRDD[3] at reduceByKey at <console>:21 [Disk Serialized 1x Replicated]
// +-(8) MapPartitionsRDD[2] at map at <console>:21 [Disk Serialized 1x Replicated]
// | ParallelCollectionRDD[1] at parallelize at <console>:21 [Disk Serialized 1x Replicated]
Run Code Online (Sandbox Code Playgroud)checkpoint:
val indChk = rdd.mapValues(_ > 4)
indChk.checkpoint
// indChk.toDebugString
// (8) MapPartitionsRDD[11] at mapValues at <console>:24 []
// | ShuffledRDD[3] at reduceByKey at <console>:21 []
// +-(8) MapPartitionsRDD[2] at map at <console>:21 []
// | ParallelCollectionRDD[1] at parallelize at <console>:21 []
indChk.count
// 3
indChk.toDebugString
// (8) MapPartitionsRDD[11] at mapValues at <console>:24 []
// | ReliableCheckpointRDD[12] at count at <console>:27 []
Run Code Online (Sandbox Code Playgroud)正如您在第一种情况下所看到的那样,即使从缓存中提取数据,也会保留沿袭.这意味着如果某些分区indCache丢失,可以从头开始重新计算数据.在第二种情况下,谱系在检查点之后完全丢失,indChk并且不再携带重建它所需的信息.
checkpoint,与cache/ persist与其他工作分开计算.这就是为什么应该缓存标记为检查点的RDD的原因:
强烈建议将此RDD保留在内存中,否则将其保存在文件中将需要重新计算.
最后checkpointed数据是持久的,在SparkContext销毁后不会被删除.
关于在非本地模式下运行时需求路径SparkContext.setCheckpointDir使用的数据存储.否则它也可以是本地文件系统.并没有复制应该使用本地文件系统.RDD.checkpointDFSlocalCheckpointpersist
注意:
RDD检查点与Spark Streaming中的chekpointing不同.前一个旨在解决沿袭问题,后一个是关于流可靠性和故障恢复.
小智 28
我想你可以在这里找到一个非常详细的答案
我会说,虽然很难在该页面中总结所有内容
坚持
检查点
您可能希望阅读本文以了解Spark的检查点或缓存操作的更多详细信息或内部信息.
Persist(MEMORY_AND_DISK)将数据帧临时存储到磁盘和内存中,而不会破坏程序的谱系,即df.rdd.toDebugString()将返回相同的输出。建议在计算中使用persist(*),它将被重用以避免重新计算中间结果:
df = df.persist(StorageLevel.MEMORY_AND_DISK)
calculation1(df)
calculation2(df)
Run Code Online (Sandbox Code Playgroud)
请注意,缓存数据帧并不能保证它将保留在内存中,直到您下次调用它为止。根据内存使用情况,可以丢弃缓存。
另一方面,checkpoint()中断沿袭并强制将数据帧存储在磁盘上。与使用cache()/ persist()不同,频繁的检查点会降低程序的速度。建议在以下情况下使用检查点:a)在不稳定的环境中工作,以便从故障中快速恢复b)当RDD的新条目依赖于先前条目时,存储计算的中间状态,即避免在失败的情况下重新计算长的依赖链
| 归档时间: |
|
| 查看次数: |
33356 次 |
| 最近记录: |