相关疑难解决方法(0)

了解Spark的缓存

我试图理解Spark的缓存是如何工作的.

这是我天真的理解,如果我遗漏了什么,请告诉我:

val rdd1 = sc.textFile("some data")
rdd1.cache() //marks rdd1 as cached
val rdd2 = rdd1.filter(...)
val rdd3 = rdd1.map(...)
rdd2.saveAsTextFile("...")
rdd3.saveAsTextFile("...")
Run Code Online (Sandbox Code Playgroud)

在上面,rdd1将仅从磁盘(例如HDFS)加载一次.(当我保留rdd2时),然后从缓存(假设有足够的RAM)保存rdd3时)

现在这是我的问题.假设我想缓存rdd2和rdd3,因为它们将在以后使用,但在创建后我不需要rdd1.

基本上有重复,不是吗?因为一旦计算出rdd2和rdd3,我就不再需要rdd1了,我应该可以解决它,对吧?问题是什么时候?

这会有用吗?(选项A)

val rdd1 = sc.textFile("some data")
rdd1.cache()   // marks rdd as cached
val rdd2 = rdd1.filter(...)
val rdd3 = rdd1.map(...)
rdd2.cache()
rdd3.cache()
rdd1.unpersist()
Run Code Online (Sandbox Code Playgroud)

火花是否会给DAG增加无人值守的召唤?还是立即完成?如果它立即完成,那么当我从rdd2和rdd3读取时,基本上rdd1将被非缓存,对吗?

我应该这样做(选项B)吗?

val rdd1 = sc.textFile("some data")
rdd1.cache()   // marks rdd as cached
val rdd2 = rdd1.filter(...)
val rdd3 = rdd1.map(...)

rdd2.cache()
rdd3.cache()

rdd2.saveAsTextFile("...")
rdd3.saveAsTextFile("...")

rdd1.unpersist()
Run Code Online (Sandbox Code Playgroud)

所以问题是: 选项A是否足够好?ie rdd1仍然只加载一次文件?或者我需要选择B吗?

apache-spark

34
推荐指数
1
解决办法
2万
查看次数

如何防止谓词下推?

最近我正在使用带有 JDBC 数据源的 Spark。考虑以下片段:

val df = spark.read.(options).format("jdbc").load();
val newDF = df.where(PRED)
Run Code Online (Sandbox Code Playgroud)

PRED 是谓词列表。

如果 PRED 是一个简单的谓词,比如x = 10,查询会快得多。但是,如果有一些不相等的条件,例如date > someOtherDate or date < someOtherDate2,查询比没有谓词下推要慢得多。您可能知道,数据库引擎对此类谓词的扫描速度非常慢,在我的情况下甚至慢 10 倍(!)。

为了防止不必要的谓词下推,我使用了:

val cachedDF = df.cache()
val newDF = cachedDF.where(PRED)
Run Code Online (Sandbox Code Playgroud)

但它需要大量内存并且 - 由于这里提到的问题 - Spark' Dataset unpersist 行为- 我无法 unpersist cachedDF

还有其他选择可以避免下推谓词吗?没有缓存也没有编写自己的数据源?

注意:即使有关闭谓词下推的选项,它也仅适用于其他查询可能仍在使用它。所以,如果我写:

// some fancy option set to not push down predicates
val df1 = ...
// predicate pushdown works again
val df2 = ...
df1.join(df2)// where …
Run Code Online (Sandbox Code Playgroud)

apache-spark apache-spark-sql

5
推荐指数
1
解决办法
1435
查看次数

标签 统计

apache-spark ×2

apache-spark-sql ×1