什么时候缓存和持久执行(因为它们看起来不像是动作)?

Ank*_*try 8 scala lazy-evaluation apache-spark

我正在实现一个spark应用程序,下面是一个示例代码段(不完全相同的代码):

val rdd1 = sc.textfile(HDFS_PATH)
val rdd2 = rdd1.map(func)
rdd2.persist(StorageLevel.MEMORY_AND_DISK)
println(rdd2.count)
Run Code Online (Sandbox Code Playgroud)

在从Spark Application Master UI检查此代码的性能时,我看到了该count操作的条目,但没有看到该条目的条目persist.此计数操作的DAG还具有"映射"转换的节点(上述代码的第2行).

可以安全地得出结论,当count遇到(在最后一行)时执行地图转换,而不是在persist遇到时?

另外,在什么时候rdd2实际上仍然存在?据我所知,在RDD上只能调用两种类型的操作 - 转换和操作.如果在count调用动作时RDD持续存在,那么会持续被认为是转换还是动作,或者两者都没有?

Jac*_*ski 17

数据集cachepersist运算符是懒惰的,在你调用一个动作之前没有任何效果(并等到缓存完成,这是以后获得更好性能的额外价格).

来自Spark的官方文档RDD Persistence(带有粗体的句子):

Spark中最重要的功能之一是跨操作在内存中持久化(或缓存)数据集.当您持久保存RDD时,每个节点都会存储它在内存中计算的任何分区,并在该数据集(或从中派生的数据集)的其他操作中重用它们.这使得未来的行动更快(通常超过10倍).缓存是迭代算法和快速交互使用的关键工具.

您可以使用persist()cache()方法标记要保留的RDD .第一次在动作中计算它,它将保留在节点的内存中.Spark的缓存是容错的 - 如果丢失了RDD的任何分区,它将使用最初创建它的转换自动重新计算.

这正是为什么有些人(以及Spark SQL本身!)执行以下技巧的原因:

rdd2.persist(StorageLevel.MEMORY_AND_DISK).count
Run Code Online (Sandbox Code Playgroud)

触发缓存.

count 运算符相当便宜,因此净效果是缓存几乎在行之后立即执行(缓存在异步执行之前可能会有一小段延迟).

所以当你问:

persist被视为转型还是行动?

我会说这是既不并认为这是一个优化提示(可能会或可能不会被执行,或者考虑).


使用Web UI的"存储"选项卡查看已保留的数据集(作为其基础RDD).

在此输入图像描述

您还可以使用(或简单地)查看cachepersist操作员的输出.explainQueryExecution.optimizedPlan

val q1 = spark.range(10).groupBy('id % 5).count.cache
scala> q1.explain
== Physical Plan ==
InMemoryTableScan [(id % 5)#84L, count#83L]
   +- InMemoryRelation [(id % 5)#84L, count#83L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
         +- *HashAggregate(keys=[(id#77L % 5)#88L], functions=[count(1)])
            +- Exchange hashpartitioning((id#77L % 5)#88L, 200)
               +- *HashAggregate(keys=[(id#77L % 5) AS (id#77L % 5)#88L], functions=[partial_count(1)])
                  +- *Range (0, 10, step=1, splits=8)

scala> println(q1.queryExecution.optimizedPlan.numberedTreeString)
00 InMemoryRelation [(id % 5)#84L, count#83L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
01    +- *HashAggregate(keys=[(id#77L % 5)#88L], functions=[count(1)], output=[(id % 5)#84L, count#83L])
02       +- Exchange hashpartitioning((id#77L % 5)#88L, 200)
03          +- *HashAggregate(keys=[(id#77L % 5) AS (id#77L % 5)#88L], functions=[partial_count(1)], output=[(id#77L % 5)#88L, count#90L])
04             +- *Range (0, 10, step=1, splits=8)

// Cache sample table range5 using pure SQL
// That registers range5 to contain the output of range(5) function
spark.sql("CACHE TABLE range5 AS SELECT * FROM range(5)")
val q2 = spark.sql("SELECT * FROM range5")
scala> q2.explain
== Physical Plan ==
InMemoryTableScan [id#0L]
   +- InMemoryRelation [id#0L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas), `range5`
         +- *Range (0, 5, step=1, splits=8)
Run Code Online (Sandbox Code Playgroud)

InMemoryTableScan物理运算符(具有InMemoryRelation逻辑计划)是如何确保查询缓存在内存中并因此重用.


此外,Spark SQL本身使用相同的模式来触发SQL的CACHE TABLE查询的 DataFrame缓存(与RDD缓存不同,默认情况下是急切的):

if (!isLazy) {
  // Performs eager caching
  sparkSession.table(tableIdent).count()
}
Run Code Online (Sandbox Code Playgroud)

这意味着,就缓存而言,就缓存而言,您可能会有不同的结果.默认情况下cache,persist运算符是惰性的,而SQL CACHE TABLE是急切的.

  • 谢谢Jacek!这为Spark用户提供了深刻的见解,这对于所有类型的用户来说都是非常了解的。您对SO的回答是惊人的:)博客也是如此! (2认同)
  • @AnkitKhettry感谢您的客气话。如果您喜欢,请给答案加分(以保持我的自我... ekhm ...的积极性:)) (2认同)