在(py)spark中取消持久保存所有数据帧

bja*_*ck3 26 python caching apache-spark apache-spark-sql pyspark

我是一个火花应用程序,有几点我想坚持当前的状态.这通常是在一大步之后,或缓存我想要多次使用的状态.看来,当我第二次在我的数据帧上调用缓存时,新副本会缓存到内存中.在我的应用程序中,这会在扩展时导致内存问题.即使在我当前的测试中,给定的数据帧最大约为100 MB,中间结果的累积大小也会超​​出执行程序的分配内存.请参阅下面的一个显示此行为的小示例.

cache_test.py:

from pyspark import SparkContext, HiveContext

spark_context = SparkContext(appName='cache_test')
hive_context = HiveContext(spark_context)

df = (hive_context.read
      .format('com.databricks.spark.csv')
      .load('simple_data.csv')
     )
df.cache()
df.show()

df = df.withColumn('C1+C2', df['C1'] + df['C2'])
df.cache()
df.show()

spark_context.stop()
Run Code Online (Sandbox Code Playgroud)

simple_data.csv:

1,2,3
4,5,6
7,8,9
Run Code Online (Sandbox Code Playgroud)

查看应用程序UI,有一个原始数据框的副本,与新列的副本相对应.我可以通过df.unpersist()在withColumn行之前调用来删除原始副本.这是删除缓存中间结果的推荐方法(即在每次调用之前调用unpersist cache()).

此外,是否可以清除所有缓存的对象.在我的应用程序中,有一些自然断点,我可以简单地清除所有内存,然后转到下一个文件.我想这样做而不为每个输入文件创建一个新的spark应用程序.

先感谢您!

zer*_*323 41

Spark 2.x

你可以使用Catalog.clearCache:

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate
...
spark.catalog.clearCache()
Run Code Online (Sandbox Code Playgroud)

Spark 1.x

你可以使用SQLContext.clearCache哪种方法

从内存缓存中删除所有缓存的表.

from pyspark.sql import SQLContext
from pyspark import SparkContext

sqlContext = SQLContext.getOrCreate(SparkContext.getOrCreate())
...
sqlContext.clearCache()
Run Code Online (Sandbox Code Playgroud)

  • 这是目前一个很好的解决方案,因为它允许我在合理的断点处清除完整缓存。我将合并这一点,但我担心当我扩大规模并开始使用更大的数据集时,旧的缓存开始失去控制。如果我想随时清除旧缓存,建议创建一个新变量(或临时变量),并明确地取消保留旧对象。类似于:`df.cache()`; `df_new = df.withColumn('C1+C2', df['C1'] + df['C2'])`; `df_new.cache()`; `df.unpersist()`。如果这是唯一的方法,这似乎有点麻烦...... (2认同)
  • 我担心我当时做错了什么。在我的完整应用程序中,由于内存不足错误,我的作业最终会崩溃。数据帧的每个副本都相当小(小于 100 MB),但缓存似乎永远存在;即使在将输出写入文件并继续下一步之后。我会看看我是否可以生成一个较小的工作示例来展示这一点。 (2认同)
  • 谢谢你的澄清。我不确定我的观察是否与您描述的行为一致。在我的测试中,除非我使用“clearCache()”明确释放它们,否则丢失的小缓存会持续存在。这会导致内存不足错误。如果缓存在幕后被释放,即使我不使用`clearCache`,我也不希望我分配的内存饱和。即使执行程序内存不足,您是否知道为什么缓存可能不会被逐出? (2认同)

Tag*_*gar 7

我们经常使用

for (id, rdd) in sc._jsc.getPersistentRDDs().items():
    rdd.unpersist()
    print("Unpersisted {} rdd".format(id))
Run Code Online (Sandbox Code Playgroud)

哪里sc是sparkContext变量。