(为什么)我们需要在RDD上调用缓存或持久化

Ram*_*ana 161 scala apache-spark rdd

当从文本文件或集合(或从另一个RDD)创建弹性分布式数据集(RDD)时,我们是否需要显式调用"cache"或"persist"来将RDD数据存储到内存中?或者默认情况下RDD数据是以分布式方式存储在内存中的吗?

val textFile = sc.textFile("/user/emp.txt")
Run Code Online (Sandbox Code Playgroud)

根据我的理解,在上面的步骤之后,textFile是一个RDD,并且可以在节点的所有/部分内存中使用.

如果是这样,为什么我们需要在textFile RDD上调用"cache"或"persist"呢?

Dan*_*bos 283

大多数RDD操作都是懒惰的.将RDD视为一系列操作的描述.RDD不是数据.所以这一行:

val textFile = sc.textFile("/user/emp.txt")
Run Code Online (Sandbox Code Playgroud)

它什么都不做.它创建了一个RDD,上面写着"我们需要加载这个文件".此时未加载该文件.

需要观察数据内容的RDD操作不能是懒惰的.(这些被称为动作.)一个例子是RDD.count- 告诉你文件中的行数,需要读取文件.因此,如果您编写textFile.count,此时将读取文件,将对行进行计数,并返回计数.

如果再打电话textFile.count怎么办?同样的事情:文件将被读取并再次计数.什么都没有存储.RDD不是数据.

那怎么RDD.cache办?如果您添加textFile.cache到上面的代码:

val textFile = sc.textFile("/user/emp.txt")
textFile.cache
Run Code Online (Sandbox Code Playgroud)

它什么都不做.RDD.cache也是一个懒惰的操作.该文件仍未读取.但是现在RDD说"读取这个文件,然后缓存内容".如果您textFile.count第一次运行,则将加载,缓存和计算该文件.如果textFile.count再次呼叫,操作将使用缓存.它只会从缓存中获取数据并计算行数.

缓存行为取决于可用内存.例如,如果文件不适合内存,textFile.count则会回退到通常的行为并重新读取文件.

  • Spark假设文件永远不会改变.它在任意时间点读取文件,并可在以后根据需要重新读取部分文件.(例如,如果从缓存中推出了一段数据.)因此,您最好保持文件不变!只需在有新数据时使用新名称创建一个新文件,然后将其作为新RDD加载.如果您不断获得新数据,请查看Spark Streaming. (19认同)
  • 是.RDD是不可变的,因此每个RDD都假定其依赖性也是不可变的.Spark Streaming允许您设置可在更改流上运行的树.但更简单的解决方案是在一个以文件名作为参数的函数中构建树.然后只需调用新文件和poof的函数,你就有了新的计算树. (9认同)
  • 嗨丹尼尔, - 当你调用缓存时,这是否意味着RDD没有从源重新加载(例如文本文件) - 如何确保文本文件中的数据在缓存时是最新的?(确实可以解决这个问题,还是定期进行手动操作,以确保源数据在以后的血统中重新计算?) (4认同)

maa*_*asg 180

我认为这个问题会更好地表达为:

我们什么时候需要在RDD上调用缓存或持久化?

Spark流程是懒惰的,也就是说,在需要之前不会发生任何事情.为了快速回答这个问题,在val textFile = sc.textFile("/user/emp.txt")发布之后,数据没有任何反应,只HadoopRDD使用文件作为源构建了一个.

假设我们稍微改变了这些数据:

val wordsRDD = textFile.flatMap(line => line.split("\\W"))
Run Code Online (Sandbox Code Playgroud)

同样,数据没有任何反应.现在有一个新的RDD wordsRDD,它包含一个引用testFile和一个需要时应用的函数.

只有在RDD上调用动作时wordsRDD.count,才会执行称为沿袭的RDD链.也就是说,分区中分解的数据将由Spark集群的执行程序加载,flatMap将应用该函数并计算结果.

在线性谱系上,cache()不需要像本例中的那个.数据将被加载到执行程序,所有转换将被应用,最后count将被计算,全部在内存中 - 如果数据适合内存.

cache当RDD的分支分支出来时很有用.假设您想要将前一个示例中的单词过滤为正面和负面单词的计数.你可以这样做:

val positiveWordsCount = wordsRDD.filter(word => isPositive(word)).count()
val negativeWordsCount = wordsRDD.filter(word => isNegative(word)).count()
Run Code Online (Sandbox Code Playgroud)

在这里,每个分支发出数据的重新加载.添加显式cache语句将确保保留并重用以前完成的处理.工作将如下所示:

val textFile = sc.textFile("/user/emp.txt")
val wordsRDD = textFile.flatMap(line => line.split("\\W"))
wordsRDD.cache()
val positiveWordsCount = wordsRDD.filter(word => isPositive(word)).count()
val negativeWordsCount = wordsRDD.filter(word => isNegative(word)).count()
Run Code Online (Sandbox Code Playgroud)

出于这个原因,cache据说"打破血统",因为它创建了一个可以重复用于进一步处理的检查点.

经验法则:cache当RDD的分支分支出来或多次使用RDD时使用,如循环中一样.

  • @maasg纠正我,如果我错了,但既不"缓存"也不"坚持"可以_break the lineage_. (5认同)
  • 惊人的。谢谢。还有一个相关的问题。当我们缓存或持久化时,数据将存储在执行器的内存或工作节点的内存中。如果是 executor 的内存,Spark 如何识别哪个 executor 有数据。 (2认同)

eli*_*sah 27

我们是否需要显式调用"cache"或"persist"来将RDD数据存储到内存中?

是的,仅在需要时.

RDD数据默认以分布式方式存储在内存中?

没有!

这些是为什么:

  • Spark支持两种类型的共享变量:广播变量,可用于缓存所有节点的内存中的值;累加器,它们是仅"添加"到的变量,例如计数器和总和.

  • RDD支持两种类型的操作:转换(从现有数据集创建新数据集)和操作(在数据集上运行计算后将值返回到驱动程序).例如,map是一个转换,它通过一个函数传递每个数据集元素,并返回一个表示结果的新RDD.另一方面,reduce是一个使用某个函数聚合RDD的所有元素的操作,并将最终结果返回给驱动程序(尽管还有一个返回分布式数据集的并行reduceByKey).

  • Spark中的所有转换都是惰性的,因为它们不会立即计算结果.相反,他们只记得应用于某些基础数据集的转换(例如文件).仅当操作需要将结果返回到驱动程序时才会计算转换.这种设计使Spark能够更有效地运行 - 例如,我们可以意识到通过map创建的数据集将用于reduce,并仅将reduce的结果返回给驱动程序,而不是更大的映射数据集.

  • 默认情况下,每次对其执行操作时,都可以重新计算每个转换后的RDD.但是,您也可以使用持久化(或缓存)方法在内存中保留RDD,在这种情况下,Spark会在群集上保留元素,以便在下次查询时更快地访问.还支持在磁盘上保留RDD或在多个节点上复制.

有关更多详细信息,请查看Spark编程指南.

  • 这是一个很好的答案,我不知道为什么它被downvoted.这是一个自上而下的答案,解释了RDD如何从高级概念中发挥作用.我添加了另一个自下而上的答案:从"这行做什么"开始.对于刚开始使用Spark的人来说,也许更容易理解. (2认同)

ril*_*yss 7

下面是应缓存RDD的三种情况:

多次使用RDD

在同一RDD上执行多项操作

用于长链(或非常昂贵的)转换


zin*_*ing 6

添加另一个原因来添加(或临时添加)cache方法调用。

用于调试内存问题

with cache方法,spark将提供有关RDD大小的调试信息。因此在spark集成UI中,您将获得RDD内存消耗信息。事实证明,这对诊断内存问题非常有帮助。