我试图理解Spark的缓存是如何工作的.
这是我天真的理解,如果我遗漏了什么,请告诉我:
val rdd1 = sc.textFile("some data")
rdd1.cache() //marks rdd1 as cached
val rdd2 = rdd1.filter(...)
val rdd3 = rdd1.map(...)
rdd2.saveAsTextFile("...")
rdd3.saveAsTextFile("...")
Run Code Online (Sandbox Code Playgroud)
在上面,rdd1将仅从磁盘(例如HDFS)加载一次.(当我保留rdd2时),然后从缓存(假设有足够的RAM)保存rdd3时)
现在这是我的问题.假设我想缓存rdd2和rdd3,因为它们将在以后使用,但在创建后我不需要rdd1.
基本上有重复,不是吗?因为一旦计算出rdd2和rdd3,我就不再需要rdd1了,我应该可以解决它,对吧?问题是什么时候?
这会有用吗?(选项A)
val rdd1 = sc.textFile("some data")
rdd1.cache() // marks rdd as cached
val rdd2 = rdd1.filter(...)
val rdd3 = rdd1.map(...)
rdd2.cache()
rdd3.cache()
rdd1.unpersist()
Run Code Online (Sandbox Code Playgroud)
火花是否会给DAG增加无人值守的召唤?还是立即完成?如果它立即完成,那么当我从rdd2和rdd3读取时,基本上rdd1将被非缓存,对吗?
我应该这样做(选项B)吗?
val rdd1 = sc.textFile("some data")
rdd1.cache() // marks rdd as cached
val rdd2 = rdd1.filter(...)
val rdd3 = rdd1.map(...)
rdd2.cache()
rdd3.cache()
rdd2.saveAsTextFile("...")
rdd3.saveAsTextFile("...")
rdd1.unpersist()
Run Code Online (Sandbox Code Playgroud)
所以问题是:
选项A是否足够好?ie rdd1仍然只加载一次文件?或者我需要选择B吗?
最近我正在使用带有 JDBC 数据源的 Spark。考虑以下片段:
val df = spark.read.(options).format("jdbc").load();
val newDF = df.where(PRED)
Run Code Online (Sandbox Code Playgroud)
PRED 是谓词列表。
如果 PRED 是一个简单的谓词,比如x = 10,查询会快得多。但是,如果有一些不相等的条件,例如date > someOtherDate or date < someOtherDate2,查询比没有谓词下推要慢得多。您可能知道,数据库引擎对此类谓词的扫描速度非常慢,在我的情况下甚至慢 10 倍(!)。
为了防止不必要的谓词下推,我使用了:
val cachedDF = df.cache()
val newDF = cachedDF.where(PRED)
Run Code Online (Sandbox Code Playgroud)
但它需要大量内存并且 - 由于这里提到的问题 - Spark' Dataset unpersist 行为- 我无法 unpersist cachedDF。
还有其他选择可以避免下推谓词吗?没有缓存也没有编写自己的数据源?
注意:即使有关闭谓词下推的选项,它也仅适用于其他查询可能仍在使用它。所以,如果我写:
// some fancy option set to not push down predicates
val df1 = ...
// predicate pushdown works again
val df2 = ...
df1.join(df2)// where …Run Code Online (Sandbox Code Playgroud)