Ank*_*try 8 scala lazy-evaluation apache-spark
我正在实现一个spark应用程序,下面是一个示例代码段(不完全相同的代码):
val rdd1 = sc.textfile(HDFS_PATH)
val rdd2 = rdd1.map(func)
rdd2.persist(StorageLevel.MEMORY_AND_DISK)
println(rdd2.count)
Run Code Online (Sandbox Code Playgroud)
在从Spark Application Master UI检查此代码的性能时,我看到了该count
操作的条目,但没有看到该条目的条目persist
.此计数操作的DAG还具有"映射"转换的节点(上述代码的第2行).
可以安全地得出结论,当count
遇到(在最后一行)时执行地图转换,而不是在persist
遇到时?
另外,在什么时候rdd2实际上仍然存在?据我所知,在RDD上只能调用两种类型的操作 - 转换和操作.如果在count
调用动作时RDD持续存在,那么会持续被认为是转换还是动作,或者两者都没有?
Jac*_*ski 17
数据集cache
和persist
运算符是懒惰的,在你调用一个动作之前没有任何效果(并等到缓存完成,这是以后获得更好性能的额外价格).
来自Spark的官方文档RDD Persistence(带有粗体的句子):
Spark中最重要的功能之一是跨操作在内存中持久化(或缓存)数据集.当您持久保存RDD时,每个节点都会存储它在内存中计算的任何分区,并在该数据集(或从中派生的数据集)的其他操作中重用它们.这使得未来的行动更快(通常超过10倍).缓存是迭代算法和快速交互使用的关键工具.
您可以使用
persist()
或cache()
方法标记要保留的RDD .第一次在动作中计算它,它将保留在节点的内存中.Spark的缓存是容错的 - 如果丢失了RDD的任何分区,它将使用最初创建它的转换自动重新计算.
这正是为什么有些人(以及Spark SQL本身!)执行以下技巧的原因:
rdd2.persist(StorageLevel.MEMORY_AND_DISK).count
Run Code Online (Sandbox Code Playgroud)
触发缓存.
count
运算符相当便宜,因此净效果是缓存几乎在行之后立即执行(缓存在异步执行之前可能会有一小段延迟).
所以当你问:
会
persist
被视为转型还是行动?
我会说这是既不并认为这是一个优化提示(可能会或可能不会被执行,或者考虑过).
使用Web UI的"存储"选项卡查看已保留的数据集(作为其基础RDD).
您还可以使用(或简单地)查看cache
或persist
操作员的输出.explain
QueryExecution.optimizedPlan
val q1 = spark.range(10).groupBy('id % 5).count.cache
scala> q1.explain
== Physical Plan ==
InMemoryTableScan [(id % 5)#84L, count#83L]
+- InMemoryRelation [(id % 5)#84L, count#83L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
+- *HashAggregate(keys=[(id#77L % 5)#88L], functions=[count(1)])
+- Exchange hashpartitioning((id#77L % 5)#88L, 200)
+- *HashAggregate(keys=[(id#77L % 5) AS (id#77L % 5)#88L], functions=[partial_count(1)])
+- *Range (0, 10, step=1, splits=8)
scala> println(q1.queryExecution.optimizedPlan.numberedTreeString)
00 InMemoryRelation [(id % 5)#84L, count#83L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
01 +- *HashAggregate(keys=[(id#77L % 5)#88L], functions=[count(1)], output=[(id % 5)#84L, count#83L])
02 +- Exchange hashpartitioning((id#77L % 5)#88L, 200)
03 +- *HashAggregate(keys=[(id#77L % 5) AS (id#77L % 5)#88L], functions=[partial_count(1)], output=[(id#77L % 5)#88L, count#90L])
04 +- *Range (0, 10, step=1, splits=8)
// Cache sample table range5 using pure SQL
// That registers range5 to contain the output of range(5) function
spark.sql("CACHE TABLE range5 AS SELECT * FROM range(5)")
val q2 = spark.sql("SELECT * FROM range5")
scala> q2.explain
== Physical Plan ==
InMemoryTableScan [id#0L]
+- InMemoryRelation [id#0L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas), `range5`
+- *Range (0, 5, step=1, splits=8)
Run Code Online (Sandbox Code Playgroud)
InMemoryTableScan
物理运算符(具有InMemoryRelation
逻辑计划)是如何确保查询缓存在内存中并因此重用.
此外,Spark SQL本身使用相同的模式来触发SQL的CACHE TABLE查询的 DataFrame缓存(与RDD缓存不同,默认情况下是急切的):
if (!isLazy) {
// Performs eager caching
sparkSession.table(tableIdent).count()
}
Run Code Online (Sandbox Code Playgroud)
这意味着,就缓存而言,就缓存而言,您可能会有不同的结果.默认情况下cache
,persist
运算符是惰性的,而SQL CACHE TABLE
是急切的.
归档时间: |
|
查看次数: |
3149 次 |
最近记录: |