如果我缓存一个 Spark 数据帧然后覆盖引用,原始数据帧还会被缓存吗?

And*_* Ma 15 python apache-spark apache-spark-sql pyspark

假设我有一个函数来生成 (py)spark 数据帧,将数据帧缓存到内存中作为最后一个操作。

def gen_func(inputs):
   df = ... do stuff...
   df.cache()
   df.count()
   return df
Run Code Online (Sandbox Code Playgroud)

根据我的理解,Spark 的缓存工作如下:

  1. 当在数据帧上调用cache/persistplus 一个动作 ( count()) 时,它是从它的 DAG 计算出来的,并缓存到内存中,附加到引用它的对象上。
  2. 只要存在对该对象的引用,可能在其他函数/其他范围内,df 将继续被缓存,并且所有依赖于 df 的 DAG 将使用内存中缓存的数据作为起点。
  3. 如果对 df 的所有引用都被删除,Spark 会将缓存作为内存进行垃圾回收。它可能不会立即被垃圾回收,导致一些短期内存块(特别是如果生成缓存数据并过快丢弃它们会导致内存泄漏),但最终会被清除。

我的问题是,假设我gen_func用来生成一个数据框,然后覆盖原始数据框引用(可能是 afilter或 a withColumn)。

df=gen_func(inputs)
df=df.filter("some_col = some_val")
Run Code Online (Sandbox Code Playgroud)

在 Spark 中,RDD/DF 是不可变的,因此过滤器后重新分配的 df 和过滤器前的 df 指的是两个完全不同的对象。在这种情况下,对原始 df 的引用cache/counted已被覆盖。这是否意味着缓存的数据帧不再可用并将被垃圾收集?这是否意味着新的后置过滤器df将从头开始计算所有内容,尽管是从先前缓存的数据帧生成的?

我问这个是因为我最近正在修复我的代码的一些内存不足问题,在我看来,缓存可能是问题所在。然而,我还没有真正理解什么是使用缓存的安全方法的全部细节,以及人们如何可能不小心使自己的缓存内存无效。我的理解中缺少什么?我在执行上述操作时是否偏离了最佳实践?

mck*_*mck 13

我做了几个实验,如下所示。显然,数据帧一旦被缓存,就会保持缓存状态getPersistentRDDs(如查询计划等所示InMemory),即使所有 Python 引用都被覆盖或完全删除del,并且显式调用了垃圾收集。

实验一:

def func():
    data = spark.createDataFrame([[1],[2],[3]]).toDF('col1')
    data.cache()
    data.count()
    return data

sc._jsc.getPersistentRDDs()

df = func()
sc._jsc.getPersistentRDDs()

df2 = df.filter('col1 != 2')
del df
import gc
gc.collect()
sc._jvm.System.gc()
sc._jsc.getPersistentRDDs()

df2.select('*').explain()

del df2
gc.collect()
sc._jvm.System.gc()
sc._jsc.getPersistentRDDs()
Run Code Online (Sandbox Code Playgroud)

结果:

>>> def func():
...     data = spark.createDataFrame([[1],[2],[3]]).toDF('col1')
...     data.cache()
...     data.count()
...     return data
...
>>> sc._jsc.getPersistentRDDs()
{}

>>> df = func()
>>> sc._jsc.getPersistentRDDs()
{71: JavaObject id=o234}

>>> df2 = df.filter('col1 != 2')
>>> del df
>>> import gc
>>> gc.collect()
93
>>> sc._jvm.System.gc()
>>> sc._jsc.getPersistentRDDs()
{71: JavaObject id=o240}

>>> df2.select('*').explain()
== Physical Plan ==
*(1) Filter (isnotnull(col1#174L) AND NOT (col1#174L = 2))
+- *(1) ColumnarToRow
   +- InMemoryTableScan [col1#174L], [isnotnull(col1#174L), NOT (col1#174L = 2)]
         +- InMemoryRelation [col1#174L], StorageLevel(disk, memory, deserialized, 1 replicas)
               +- *(1) Project [_1#172L AS col1#174L]
                  +- *(1) Scan ExistingRDD[_1#172L]

>>> del df2
>>> gc.collect()
85
>>> sc._jvm.System.gc()
>>> sc._jsc.getPersistentRDDs()
{71: JavaObject id=o250}
Run Code Online (Sandbox Code Playgroud)

实验2:

def func():
    data = spark.createDataFrame([[1],[2],[3]]).toDF('col1')
    data.cache()
    data.count()
    return data

sc._jsc.getPersistentRDDs()

df = func()
sc._jsc.getPersistentRDDs()

df = df.filter('col1 != 2')
import gc
gc.collect()
sc._jvm.System.gc()
sc._jsc.getPersistentRDDs()

df.select('*').explain()

del df
gc.collect()
sc._jvm.System.gc()
sc._jsc.getPersistentRDDs()
Run Code Online (Sandbox Code Playgroud)

结果:

>>> def func():
...     data = spark.createDataFrame([[1],[2],[3]]).toDF('col1')
...     data.cache()
...     data.count()
...     return data
...
>>> sc._jsc.getPersistentRDDs()
{}

>>> df = func()
>>> sc._jsc.getPersistentRDDs()
{86: JavaObject id=o317}

>>> df = df.filter('col1 != 2')
>>> import gc
>>> gc.collect()
244
>>> sc._jvm.System.gc()
>>> sc._jsc.getPersistentRDDs()
{86: JavaObject id=o323}

>>> df.select('*').explain()
== Physical Plan ==
*(1) Filter (isnotnull(col1#220L) AND NOT (col1#220L = 2))
+- *(1) ColumnarToRow
   +- InMemoryTableScan [col1#220L], [isnotnull(col1#220L), NOT (col1#220L = 2)]
         +- InMemoryRelation [col1#220L], StorageLevel(disk, memory, deserialized, 1 replicas)
               +- *(1) Project [_1#218L AS col1#220L]
                  +- *(1) Scan ExistingRDD[_1#218L]

>>> del df
>>> gc.collect()
85
>>> sc._jvm.System.gc()
>>> sc._jsc.getPersistentRDDs()
{86: JavaObject id=o333}
Run Code Online (Sandbox Code Playgroud)

实验3(对照实验,证明unpersist有效)

def func():
    data = spark.createDataFrame([[1],[2],[3]]).toDF('col1')
    data.cache()
    data.count()
    return data

sc._jsc.getPersistentRDDs()

df = func()
sc._jsc.getPersistentRDDs()

df2 = df.filter('col1 != 2')
df2.select('*').explain()

df.unpersist()
df2.select('*').explain()
Run Code Online (Sandbox Code Playgroud)

结果:

>>> def func():
...     data = spark.createDataFrame([[1],[2],[3]]).toDF('col1')
...     data.cache()
...     data.count()
...     return data
...
>>> sc._jsc.getPersistentRDDs()
{}

>>> df = func()
>>> sc._jsc.getPersistentRDDs()
{116: JavaObject id=o398}

>>> df2 = df.filter('col1 != 2')
>>> df2.select('*').explain()
== Physical Plan ==
*(1) Filter (isnotnull(col1#312L) AND NOT (col1#312L = 2))
+- *(1) ColumnarToRow
   +- InMemoryTableScan [col1#312L], [isnotnull(col1#312L), NOT (col1#312L = 2)]
         +- InMemoryRelation [col1#312L], StorageLevel(disk, memory, deserialized, 1 replicas)
               +- *(1) Project [_1#310L AS col1#312L]
                  +- *(1) Scan ExistingRDD[_1#310L]

>>> df.unpersist()
DataFrame[col1: bigint]
>>> sc._jsc.getPersistentRDDs()
{}

>>> df2.select('*').explain()
== Physical Plan ==
*(1) Project [_1#310L AS col1#312L]
+- *(1) Filter (isnotnull(_1#310L) AND NOT (_1#310L = 2))
   +- *(1) Scan ExistingRDD[_1#310L]
Run Code Online (Sandbox Code Playgroud)

回答OP的问题:

这是否意味着缓存的数据帧不再可用并且将被垃圾收集?这是否意味着新的后置过滤器 df 将从头开始计算所有内容,尽管是从之前缓存的数据帧生成的?

实验表明两者都不是。数据帧保持缓存状态,不会被垃圾收集,并且根据查询计划,使用缓存的(不可引用的)数据帧计算新的数据帧。

一些与缓存使用相关的有用功能(如果您不想通过 Spark UI 执行此操作)包括:

sc._jsc.getPersistentRDDs(),它显示了缓存的 RDD/数据帧的列表,以及

spark.catalog.clearCache(),这会清除所有缓存的 RDD/数据帧。

我在执行上述操作时是否偏离了最佳实践?

我无权就此评判你,但正如评论之一所建议的,避免重新分配,df因为数据帧是不可变的。尝试想象您正在使用 scala 进行编码,并将其定义dfval. 做起来df = df.filter(...)是不可能的。Python 本身无法强制执行这一点,但我认为最佳实践是避免覆盖任何数据帧变量,这样df.unpersist()如果您不再需要缓存的结果,您可以随时调用。


maz*_*cha 5

我想提出几点希望澄清 Spark 在缓存方面的行为。

  1. 当你有一个

    df = ... do stuff...
    df.cache()
    df.count()
    
    Run Code Online (Sandbox Code Playgroud)

...然后在你的应用程序的其他地方

   another_df = ... do *same* stuff...
   another_df.*some_action()*
Run Code Online (Sandbox Code Playgroud)

...,您希望another_df重用缓存的df数据帧。毕竟,重用先前计算的结果是缓存的目标。意识到这一点后,Spark 开发人员决定使用分析后的逻辑计划作为识别缓存数据帧的“关键”,而不是仅仅依赖应用程序端的引用。在 Spark 中,CacheManager是在索引序列中跟踪缓存计算的组件cachedData

  /**
   * Maintains the list of cached plans as an immutable sequence.  Any updates to the list
   * should be protected in a "this.synchronized" block which includes the reading of the
   * existing value and the update of the cachedData var.
   */
  @transient @volatile
  private var cachedData = IndexedSeq[CachedData]()
Run Code Online (Sandbox Code Playgroud)

在查询计划期间(在缓存管理器阶段),会扫描此结构以查找正在分析的计划的所有子树,以查看其中是否有任何子树已被计算。如果找到匹配项,Spark 会用相应的InMemoryRelationfrom替换该子树cachedData

  1. cache()( 的简单同义词)函数通过调用来存储persist()具有存储级别的数据帧MEMORY_AND_DISKcacheQuery(...)CacheManager
      /**
       * Caches the data produced by the logical representation of the given [[Dataset]].
       * Unlike `RDD.cache()`, the default storage level is set to be `MEMORY_AND_DISK` because
       * recomputing the in-memory columnar representation of the underlying table is expensive.
       */
      def cacheQuery(...
Run Code Online (Sandbox Code Playgroud)

请注意,这与使用级别的 RDD 缓存不同MEMORY_ONLY。一旦缓存的数据帧保持缓存在内存中或本地执行器磁盘上,直到它们被显式“编辑”或调用unpersistCacheManager 。clearCache()当执行器存储内存完全填满时,缓存块开始使用 LRU(最近最少使用)推送到磁盘,但不会简单地“删除”。

好问题,顺便说一句...