spark检查点和持久存储到磁盘之间有什么区别

nag*_*dra 53 apache-spark

spark检查点和持久存储到磁盘之间有什么区别.这些都存储在本地磁盘中吗?

zer*_*323 60

没有什么重要的区别,但基本的是与血统发生的事情.Persist/ cache保持血统完整,同时checkpoint打破血统.让我们考虑以下示例:

import org.apache.spark.storage.StorageLevel

val rdd = sc.parallelize(1 to 10).map(x => (x % 3, 1)).reduceByKey(_ + _)
Run Code Online (Sandbox Code Playgroud)
  • cache/ persist:

    val indCache  = rdd.mapValues(_ > 4)
    indCache.persist(StorageLevel.DISK_ONLY)
    
    indCache.toDebugString
    // (8) MapPartitionsRDD[13] at mapValues at <console>:24 [Disk Serialized 1x Replicated]
    //  |  ShuffledRDD[3] at reduceByKey at <console>:21 [Disk Serialized 1x Replicated]
    //  +-(8) MapPartitionsRDD[2] at map at <console>:21 [Disk Serialized 1x Replicated]
    //     |  ParallelCollectionRDD[1] at parallelize at <console>:21 [Disk Serialized 1x Replicated]
    
    indCache.count
    // 3
    
    indCache.toDebugString
    // (8) MapPartitionsRDD[13] at mapValues at <console>:24 [Disk Serialized 1x Replicated]
    //  |       CachedPartitions: 8; MemorySize: 0.0 B; ExternalBlockStoreSize: 0.0 B; DiskSize: 587.0 B
    //  |  ShuffledRDD[3] at reduceByKey at <console>:21 [Disk Serialized 1x Replicated]
    //  +-(8) MapPartitionsRDD[2] at map at <console>:21 [Disk Serialized 1x Replicated]
    //     |  ParallelCollectionRDD[1] at parallelize at <console>:21 [Disk Serialized 1x Replicated]
    
    Run Code Online (Sandbox Code Playgroud)
  • checkpoint:

    val indChk  = rdd.mapValues(_ > 4)
    indChk.checkpoint
    
    // indChk.toDebugString
    // (8) MapPartitionsRDD[11] at mapValues at <console>:24 []
    //  |  ShuffledRDD[3] at reduceByKey at <console>:21 []
    //  +-(8) MapPartitionsRDD[2] at map at <console>:21 []
    //     |  ParallelCollectionRDD[1] at parallelize at <console>:21 []
    
    indChk.count
    // 3
    
    indChk.toDebugString
    // (8) MapPartitionsRDD[11] at mapValues at <console>:24 []
    //  |  ReliableCheckpointRDD[12] at count at <console>:27 []
    
    Run Code Online (Sandbox Code Playgroud)

正如您在第一种情况下所看到的那样,即使从缓存中提取数据,也会保留沿袭.这意味着如果某些分区indCache丢失,可以从头开始重新计算数据.在第二种情况下,谱系在检查点之后完全丢失,indChk并且不再携带重建它所需的信息.

checkpoint,与cache/ persist与其他工作分开计算.这就是为什么应该缓存标记为检查点的RDD的原因:

强烈建议将此RDD保留在内存中,否则将其保存在文件中将需要重新计算.

最后checkpointed数据是持久的,在SparkContext销毁后不会被删除.

关于在非本地模式下运行时需求路径SparkContext.setCheckpointDir使用的数据存储.否则它也可以是本地文件系统.并没有复制应该使用本地文件系统.RDD.checkpointDFSlocalCheckpointpersist

注意:

RDD检查点与Spark Streaming中的chekpointing不同.前一个旨在解决沿袭问题,后一个是关于流可靠性和故障恢复.

  • 这里有点困惑。如果我有一个很长的计算链,我决定打破它,将它分配给一个变量,然后将它缓存起来。然后我将读取该变量并继续。现在我的问题是,它与检查点有什么不同?(从重新计算的角度来看除外)。我真正的问题是,在什么情况下我们应该去检查点而不是上述过程?鉴于将来不会使用检查点 RDD。澄清会有所帮助:) (4认同)
  • 另一个重要的区别是,如果你 `persist` / `cache` 一个 RDD,然后需要计算依赖的 RDD-s,那么 Spark 会自动使用持久化/缓存的 RDD 内容来加快速度。但是如果你只是`checkpoint`相同的RDD,那么在计算依赖的RDD-s时就不会使用它。我想知道 Spark 何时使用检查点 RDD。如果出现故障,Spark 会*自动*使用它吗?或者我应该手动`spark.read`它来让RDD继续吗?这将解释为什么它从未被 Spark 删除。那么,应该如何使用检查点呢? (2认同)
  • @Mrinal - 回复:在什么情况下应该使用检查点?即使作业完成后,检查点仍然存在(而持久存储到磁盘的块可能会被清理)。因此,检查点有用的一件事(我读过)是,如果您有一个不稳定的长时间运行作业,有时会在繁忙的集群中失败,那么检查点可以帮助您更快地恢复,因为重新启动时您不必一路走下去回到工作的开始。 (2认同)

小智 28

我想你可以在这里找到一个非常详细的答案

我会说,虽然很难在该页面中总结所有内容

坚持

  • 使用StorageLevel.DISK_ONLY进行持久化或缓存会导致计算RDD的生成并将其存储在某个位置,以便后续使用该RDD在重新计算行列时不会超出该点.
  • 在调用persist之后,Spark仍会记住RDD的谱系,即使它没有调用它.
  • 其次,在应用程序终止后,缓存被清除或文件被破坏

检查点

  • 检查点将rdd物理存储到hdfs并破坏创建它的谱系.
  • 即使在Spark应用程序终止后,也不会删除检查点文件.
  • 检查点文件可用于后续作业运行或驱动程序
  • 检查点RDD会导致双重计算,因为在执行计算和写入检查点目录的实际作业之前,操作将首先调用高速缓存.

您可能希望阅读本文以了解Spark的检查点或缓存操作的更多详细信息或内部信息.


y.s*_*hyk 5

  1. Persist(MEMORY_AND_DISK)将数据帧临时存储到磁盘和内存中,而不会破坏程序的谱系,即df.rdd.toDebugString()将返回相同的输出。建议在计算中使用persist(*),它将被重用以避免重新计算中间结果:

    df = df.persist(StorageLevel.MEMORY_AND_DISK)
    calculation1(df)
    calculation2(df)
    
    Run Code Online (Sandbox Code Playgroud)

    请注意,缓存数据帧并不能保证它将保留在内存中,直到您下次调用它为止。根据内存使用情况,可以丢弃缓存。

  2. 另一方面,checkpoint()中断沿袭并强制将数据帧存储在磁盘上。与使用cache()/ persist()不同,频繁的检查点会降低程序的速度。建议在以下情况下使用检查点:a)在不稳定的环境中工作,以便从故障中快速恢复b)当RDD的新条目依赖于先前条目时,存储计算的中间状态,即避免在失败的情况下重新计算长的依赖链