And*_* Ma 15 python apache-spark apache-spark-sql pyspark
假设我有一个函数来生成 (py)spark 数据帧,将数据帧缓存到内存中作为最后一个操作。
def gen_func(inputs):
df = ... do stuff...
df.cache()
df.count()
return df
Run Code Online (Sandbox Code Playgroud)
根据我的理解,Spark 的缓存工作如下:
cache/persist
plus 一个动作 ( count()
) 时,它是从它的 DAG 计算出来的,并缓存到内存中,附加到引用它的对象上。我的问题是,假设我gen_func
用来生成一个数据框,然后覆盖原始数据框引用(可能是 afilter
或 a withColumn
)。
df=gen_func(inputs)
df=df.filter("some_col = some_val")
Run Code Online (Sandbox Code Playgroud)
在 Spark 中,RDD/DF 是不可变的,因此过滤器后重新分配的 df 和过滤器前的 df 指的是两个完全不同的对象。在这种情况下,对原始 df 的引用cache/counted
已被覆盖。这是否意味着缓存的数据帧不再可用并将被垃圾收集?这是否意味着新的后置过滤器df
将从头开始计算所有内容,尽管是从先前缓存的数据帧生成的?
我问这个是因为我最近正在修复我的代码的一些内存不足问题,在我看来,缓存可能是问题所在。然而,我还没有真正理解什么是使用缓存的安全方法的全部细节,以及人们如何可能不小心使自己的缓存内存无效。我的理解中缺少什么?我在执行上述操作时是否偏离了最佳实践?
mck*_*mck 13
我做了几个实验,如下所示。显然,数据帧一旦被缓存,就会保持缓存状态getPersistentRDDs
(如查询计划等所示InMemory
),即使所有 Python 引用都被覆盖或完全删除del
,并且显式调用了垃圾收集。
实验一:
def func():
data = spark.createDataFrame([[1],[2],[3]]).toDF('col1')
data.cache()
data.count()
return data
sc._jsc.getPersistentRDDs()
df = func()
sc._jsc.getPersistentRDDs()
df2 = df.filter('col1 != 2')
del df
import gc
gc.collect()
sc._jvm.System.gc()
sc._jsc.getPersistentRDDs()
df2.select('*').explain()
del df2
gc.collect()
sc._jvm.System.gc()
sc._jsc.getPersistentRDDs()
Run Code Online (Sandbox Code Playgroud)
结果:
>>> def func():
... data = spark.createDataFrame([[1],[2],[3]]).toDF('col1')
... data.cache()
... data.count()
... return data
...
>>> sc._jsc.getPersistentRDDs()
{}
>>> df = func()
>>> sc._jsc.getPersistentRDDs()
{71: JavaObject id=o234}
>>> df2 = df.filter('col1 != 2')
>>> del df
>>> import gc
>>> gc.collect()
93
>>> sc._jvm.System.gc()
>>> sc._jsc.getPersistentRDDs()
{71: JavaObject id=o240}
>>> df2.select('*').explain()
== Physical Plan ==
*(1) Filter (isnotnull(col1#174L) AND NOT (col1#174L = 2))
+- *(1) ColumnarToRow
+- InMemoryTableScan [col1#174L], [isnotnull(col1#174L), NOT (col1#174L = 2)]
+- InMemoryRelation [col1#174L], StorageLevel(disk, memory, deserialized, 1 replicas)
+- *(1) Project [_1#172L AS col1#174L]
+- *(1) Scan ExistingRDD[_1#172L]
>>> del df2
>>> gc.collect()
85
>>> sc._jvm.System.gc()
>>> sc._jsc.getPersistentRDDs()
{71: JavaObject id=o250}
Run Code Online (Sandbox Code Playgroud)
实验2:
def func():
data = spark.createDataFrame([[1],[2],[3]]).toDF('col1')
data.cache()
data.count()
return data
sc._jsc.getPersistentRDDs()
df = func()
sc._jsc.getPersistentRDDs()
df = df.filter('col1 != 2')
import gc
gc.collect()
sc._jvm.System.gc()
sc._jsc.getPersistentRDDs()
df.select('*').explain()
del df
gc.collect()
sc._jvm.System.gc()
sc._jsc.getPersistentRDDs()
Run Code Online (Sandbox Code Playgroud)
结果:
>>> def func():
... data = spark.createDataFrame([[1],[2],[3]]).toDF('col1')
... data.cache()
... data.count()
... return data
...
>>> sc._jsc.getPersistentRDDs()
{}
>>> df = func()
>>> sc._jsc.getPersistentRDDs()
{86: JavaObject id=o317}
>>> df = df.filter('col1 != 2')
>>> import gc
>>> gc.collect()
244
>>> sc._jvm.System.gc()
>>> sc._jsc.getPersistentRDDs()
{86: JavaObject id=o323}
>>> df.select('*').explain()
== Physical Plan ==
*(1) Filter (isnotnull(col1#220L) AND NOT (col1#220L = 2))
+- *(1) ColumnarToRow
+- InMemoryTableScan [col1#220L], [isnotnull(col1#220L), NOT (col1#220L = 2)]
+- InMemoryRelation [col1#220L], StorageLevel(disk, memory, deserialized, 1 replicas)
+- *(1) Project [_1#218L AS col1#220L]
+- *(1) Scan ExistingRDD[_1#218L]
>>> del df
>>> gc.collect()
85
>>> sc._jvm.System.gc()
>>> sc._jsc.getPersistentRDDs()
{86: JavaObject id=o333}
Run Code Online (Sandbox Code Playgroud)
实验3(对照实验,证明unpersist
有效)
def func():
data = spark.createDataFrame([[1],[2],[3]]).toDF('col1')
data.cache()
data.count()
return data
sc._jsc.getPersistentRDDs()
df = func()
sc._jsc.getPersistentRDDs()
df2 = df.filter('col1 != 2')
df2.select('*').explain()
df.unpersist()
df2.select('*').explain()
Run Code Online (Sandbox Code Playgroud)
结果:
>>> def func():
... data = spark.createDataFrame([[1],[2],[3]]).toDF('col1')
... data.cache()
... data.count()
... return data
...
>>> sc._jsc.getPersistentRDDs()
{}
>>> df = func()
>>> sc._jsc.getPersistentRDDs()
{116: JavaObject id=o398}
>>> df2 = df.filter('col1 != 2')
>>> df2.select('*').explain()
== Physical Plan ==
*(1) Filter (isnotnull(col1#312L) AND NOT (col1#312L = 2))
+- *(1) ColumnarToRow
+- InMemoryTableScan [col1#312L], [isnotnull(col1#312L), NOT (col1#312L = 2)]
+- InMemoryRelation [col1#312L], StorageLevel(disk, memory, deserialized, 1 replicas)
+- *(1) Project [_1#310L AS col1#312L]
+- *(1) Scan ExistingRDD[_1#310L]
>>> df.unpersist()
DataFrame[col1: bigint]
>>> sc._jsc.getPersistentRDDs()
{}
>>> df2.select('*').explain()
== Physical Plan ==
*(1) Project [_1#310L AS col1#312L]
+- *(1) Filter (isnotnull(_1#310L) AND NOT (_1#310L = 2))
+- *(1) Scan ExistingRDD[_1#310L]
Run Code Online (Sandbox Code Playgroud)
回答OP的问题:
这是否意味着缓存的数据帧不再可用并且将被垃圾收集?这是否意味着新的后置过滤器 df 将从头开始计算所有内容,尽管是从之前缓存的数据帧生成的?
实验表明两者都不是。数据帧保持缓存状态,不会被垃圾收集,并且根据查询计划,使用缓存的(不可引用的)数据帧计算新的数据帧。
一些与缓存使用相关的有用功能(如果您不想通过 Spark UI 执行此操作)包括:
sc._jsc.getPersistentRDDs()
,它显示了缓存的 RDD/数据帧的列表,以及
spark.catalog.clearCache()
,这会清除所有缓存的 RDD/数据帧。
我在执行上述操作时是否偏离了最佳实践?
我无权就此评判你,但正如评论之一所建议的,避免重新分配,df
因为数据帧是不可变的。尝试想象您正在使用 scala 进行编码,并将其定义df
为val
. 做起来df = df.filter(...)
是不可能的。Python 本身无法强制执行这一点,但我认为最佳实践是避免覆盖任何数据帧变量,这样df.unpersist()
如果您不再需要缓存的结果,您可以随时调用。
我想提出几点希望澄清 Spark 在缓存方面的行为。
当你有一个
df = ... do stuff...
df.cache()
df.count()
Run Code Online (Sandbox Code Playgroud)
...然后在你的应用程序的其他地方
another_df = ... do *same* stuff...
another_df.*some_action()*
Run Code Online (Sandbox Code Playgroud)
...,您希望another_df
重用缓存的df
数据帧。毕竟,重用先前计算的结果是缓存的目标。意识到这一点后,Spark 开发人员决定使用分析后的逻辑计划作为识别缓存数据帧的“关键”,而不是仅仅依赖应用程序端的引用。在 Spark 中,CacheManager
是在索引序列中跟踪缓存计算的组件cachedData
:
/**
* Maintains the list of cached plans as an immutable sequence. Any updates to the list
* should be protected in a "this.synchronized" block which includes the reading of the
* existing value and the update of the cachedData var.
*/
@transient @volatile
private var cachedData = IndexedSeq[CachedData]()
Run Code Online (Sandbox Code Playgroud)
在查询计划期间(在缓存管理器阶段),会扫描此结构以查找正在分析的计划的所有子树,以查看其中是否有任何子树已被计算。如果找到匹配项,Spark 会用相应的InMemoryRelation
from替换该子树cachedData
。
cache()
( 的简单同义词)函数通过调用来存储persist()
具有存储级别的数据帧MEMORY_AND_DISK
cacheQuery(...)
CacheManager
/**
* Caches the data produced by the logical representation of the given [[Dataset]].
* Unlike `RDD.cache()`, the default storage level is set to be `MEMORY_AND_DISK` because
* recomputing the in-memory columnar representation of the underlying table is expensive.
*/
def cacheQuery(...
Run Code Online (Sandbox Code Playgroud)
请注意,这与使用级别的 RDD 缓存不同MEMORY_ONLY
。一旦缓存的数据帧保持缓存在内存中或本地执行器磁盘上,直到它们被显式“编辑”或调用unpersist
CacheManager 。clearCache()
当执行器存储内存完全填满时,缓存块开始使用 LRU(最近最少使用)推送到磁盘,但不会简单地“删除”。
好问题,顺便说一句...
归档时间: |
|
查看次数: |
2456 次 |
最近记录: |