如果两个阶段使用相同的 DataFrame,spark 是否会读取同一文件两次?

fig*_*uts 7 apache-spark apache-spark-sql pyspark

以下代码读取相同的 csv 两次,即使只调用一个操作

端到端可运行示例:

import pandas as pd
import numpy as np

df1=  pd.DataFrame(np.arange(1_000).reshape(-1,1))
df1.index = np.random.choice(range(10),size=1000)
df1.to_csv("./df1.csv",index_label = "index")
############################################################################

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StringType, StructField

spark = SparkSession.builder.config("spark.sql.autoBroadcastJoinThreshold","-1").\
config("spark.sql.adaptive.enabled","false").getOrCreate()

schema = StructType([StructField('index', StringType(), True),
                     StructField('0', StringType(), True)])

df1 = spark.read.csv("./df1.csv", header=True, schema = schema)

df2 = df1.groupby("index").agg(F.mean("0"))
df3 = df1.join(df2,on='index')

df3.explain()
df3.count()
Run Code Online (Sandbox Code Playgroud)

Web UI 中的 sql 选项卡显示以下内容:

在此输入图像描述

如您所见,df1 文件被读取了两次。这是预期的行为吗?为什么会发生这种情况?我只有一项操作,因此管道的同一部分不应运行多次。

我已经在这里阅读了答案。问题几乎是相同的,但是在该问题中使用了 RDD,并且我在 pyspark API 中使用了数据帧。在这个问题中,建议如果要避免多个文件扫描,那么 DataFrames API 会有所帮助,这就是 DataFrama API 存在的首要原因

然而,事实证明,我也面临着与 DataFrame 完全相同的问题。Spark 因其效率而闻名,但效率如此低下,这似乎很奇怪(大多数情况下我只是错过了一些东西,这不是一个有效的批评:))

Koe*_*dlt 2

我的看法是分阶段的。每个阶段都是在数据的不同分区上并行运行的相同任务的集合。

您的查询有 4 个阶段:

在此输入图像描述

正如您所注意到的,Stage 0两者Stage 1都读取您的 CSV 文件。但这是有道理的,Stage 0并且Stage 1彼此完全独立。他们可能正在读取相同的数据,但正在做不同的事情。

一般来说,在开始计算中Stage 0的任何任务之前,将首先执行中的所有任务。Stage 1因此,如果您想避免重复读入数据,则需要执行以下任一操作:

  • 有一个计算 2 个输出的阶段(在本例中为 的两个输入Stage 2)。这将显着改变 Spark 架构,因为现在始终只存在具有 1 个输出的阶段。

  • Spark 确实可以(在幕后)(就像 thebluephantom 所说的那样)为.cache您决定这个数据集,但这意味着它实际上填满了您的存储内存,而您甚至没有提出这个要求(并且有进一步计算性能降低的风险)。如果底层进程实际上开始缓存您的数据,那么您将很难知道存储内存的填充情况。