有没有一种惯用的方法来缓存 Spark 数据帧?

Jef*_*ong 2 dataframe apache-spark apache-spark-sql pyspark

我有一个很大的镶木地板数据集,我正在用 Spark 阅读。读取后,我过滤了在应用不同转换的许多函数中使用的行子集:

以下与我要完成的工作类似但不完全符合逻辑:

df = spark.read.parquet(file)
special_rows = df.filter(col('special') > 0)

# Thinking about adding the following line
special_rows.cache()

def f1(df):
  new_df_1 = df.withColumn('foo', lit(0))
  return new_df_1

def f2(df):
  new_df_2 = df.withColumn('foo', lit(1))
  return new_df_2

new_df_1 = f1(special_rows)
new_df_2 = f2(special_rows)
output_df = new_df_1.union(new_df_2)
output_df.write.parquet(location)
Run Code Online (Sandbox Code Playgroud)

因为许多函数可能正在使用这个过滤的行子集,我想缓存或持久化它,以便潜在地加快执行速度/内存消耗。我知道在上面的例子中,在我最终写入parquet.

我的问题是,我需要插入某种调用的count(),例如,以触发高速缓存,或者如果拼花调用,最终写在星火就能看到这个数据帧正在被使用f1,并f2和意志缓存数据帧本身。

如果是,这是一种惯用的方法吗?这是否意味着在依赖缓存的生产和大规模 Spark 作业中,经常使用随机操作来抢先对数据帧执行操作,例如调用count?

mck*_*mck 5

在我最后一次写入镶木地板之前,不会调用任何操作。

在最后一次写入 parquet 调用期间,Spark 将能够看到此数据帧正在 f1 和 f2 中使用,并将缓存数据帧本身。

是正确的。如果这样做output_df.explain(),您将看到查询计划,这将表明您所说的是正确的。

因此,没有必要做special_rows.cache()。通常,cache仅当您打算强制 Spark 计算某些内容重用数据帧时才需要,例如 afterwriteshow。如果你看到自己故意调用count(),你可能做错了什么。