fig*_*uts 7 apache-spark apache-spark-sql pyspark
以下代码读取相同的 csv 两次,即使只调用一个操作
端到端可运行示例:
import pandas as pd
import numpy as np
df1= pd.DataFrame(np.arange(1_000).reshape(-1,1))
df1.index = np.random.choice(range(10),size=1000)
df1.to_csv("./df1.csv",index_label = "index")
############################################################################
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StringType, StructField
spark = SparkSession.builder.config("spark.sql.autoBroadcastJoinThreshold","-1").\
config("spark.sql.adaptive.enabled","false").getOrCreate()
schema = StructType([StructField('index', StringType(), True),
StructField('0', StringType(), True)])
df1 = spark.read.csv("./df1.csv", header=True, schema = schema)
df2 = df1.groupby("index").agg(F.mean("0"))
df3 = df1.join(df2,on='index')
df3.explain()
df3.count()
Run Code Online (Sandbox Code Playgroud)
Web UI 中的 sql 选项卡显示以下内容:
如您所见,df1 文件被读取了两次。这是预期的行为吗?为什么会发生这种情况?我只有一项操作,因此管道的同一部分不应运行多次。
我已经在这里阅读了答案。问题几乎是相同的,但是在该问题中使用了 RDD,并且我在 pyspark API 中使用了数据帧。在这个问题中,建议如果要避免多个文件扫描,那么 DataFrames API 会有所帮助,这就是 DataFrama API 存在的首要原因
然而,事实证明,我也面临着与 DataFrame 完全相同的问题。Spark 因其效率而闻名,但效率如此低下,这似乎很奇怪(大多数情况下我只是错过了一些东西,这不是一个有效的批评:))
我的看法是分阶段的。每个阶段都是在数据的不同分区上并行运行的相同任务的集合。
您的查询有 4 个阶段:
正如您所注意到的,Stage 0两者Stage 1都读取您的 CSV 文件。但这是有道理的,Stage 0并且Stage 1彼此完全独立。他们可能正在读取相同的数据,但正在做不同的事情。
一般来说,在开始计算中Stage 0的任何任务之前,将首先执行中的所有任务。Stage 1因此,如果您想避免重复读入数据,则需要执行以下任一操作:
有一个计算 2 个输出的阶段(在本例中为 的两个输入Stage 2)。这将显着改变 Spark 架构,因为现在始终只存在具有 1 个输出的阶段。
Spark 确实可以(在幕后)(就像 thebluephantom 所说的那样)为.cache您决定这个数据集,但这意味着它实际上填满了您的存储内存,而您甚至没有提出这个要求(并且有进一步计算性能降低的风险)。如果底层进程实际上开始缓存您的数据,那么您将很难知道存储内存的填充情况。
| 归档时间: |
|
| 查看次数: |
710 次 |
| 最近记录: |