是否应该在数据集上同时使用缓存和检查点?如果是这样,它如何在后台运行?

oir*_*ine 5 apache-spark apache-spark-sql apache-spark-dataset

我正在一个Spark ML管道上工作,在该管道上我们会在较大的数据集上看到OOM错误。在训练之前我们正在使用cache(); 我换了一下checkpoint(),我们的内存需求大大下降了。然而,在文档进行RDDcheckpoint(),它说:

强烈建议将该RDD保留在内存中,否则将其保存在文件中将需要重新计算。

DataSet我正在使用的检查点未提供相同的指导。无论如何,遵循以上建议,我发现cache()单独使用内存的需求实际上有所增加。

我的期望是当我们这样做时

...
ds.cache()
ds.checkpoint()
...
Run Code Online (Sandbox Code Playgroud)

对检查点的调用会强制对进行评估,该评估会DataSet在被检查点之前同时缓存。之后,任何对的引用都ds将引用缓存的分区,并且如果需要更多的内存并且将分区撤离,将使用检查点分区,而不是重新评估它们。这是真的吗,还是在幕后发生了什么变化?理想情况下,如果可能的话,我希望将DataSet保留在内存中,但是从内存的角度来看,使用缓存和检查点方法似乎没有任何好处。

use*_*362 5

TL; DR你不会从内存中缓存中受益(默认存储水平DatasetMEMORY_AND_DISK无论如何)的后续行动,但你还是应该考虑缓存,如果计算ds是昂贵的。

说明

您的期望

ds.cache()
ds.checkpoint()
...
Run Code Online (Sandbox Code Playgroud)

调用检查点会强制评估数据集

是正确的。Dataset.checkpoint具有不同的风格,可以同时进行急切和懒惰的检查点操作,默认变体为“急切”

def checkpoint(): Dataset[T] = checkpoint(eager = true, reliableCheckpoint = true)
Run Code Online (Sandbox Code Playgroud)

因此,后续操作应重用检查点文件。

但是,实际上,Spark仅适用checkpoint于internalRDD,因此评估规则没有改变。Spark首先评估操作,然后创建checkpoint(这就是为什么首先建议缓存的原因)。

因此,如果省略,ds.cache() ds则会在中进行两次评估ds.checkpoint()

因此没有任何改变,cache仍建议,尽管建议可能会小幅下跌,相比于普通的RDD,因为Dataset高速缓存被认为是计算昂贵,并根据上下文,它可能是更便宜的只需重新加载数据(注意,Dataset.count没有cache通常的优化,而Dataset.count没有它,不是-是否有任何性能问题迫使使用Spark中的count进行热切评估?)。