我目前正在使用PySpark开发我的第一个整个系统,并且遇到了一些奇怪的与内存相关的问题。在其中一个阶段中,我想类似于“拆分应用合并”策略以修改DataFrame。也就是说,我想对给定列定义的每个组应用一个函数,最后将它们全部合并。问题是,我要应用的函数是一种针对拟合模型的预测方法,该模型“说出”了熊猫的成语,即将其矢量化并以熊猫系列作为输入。
然后,我设计了一种迭代策略,遍历各个组并手动应用pandas_udf.Scalar来解决问题。组合部分使用对DataFrame.unionByName()的增量调用完成。我决定不使用pandas_udf的GroupedMap类型,因为文档指出该内存应由用户管理,并且只要其中一个组太大而无法保存在内存中或由一组表示,则应格外小心熊猫DataFrame。
主要问题是所有处理似乎都可以正常运行,但最后我想将最终的DataFrame序列化为Parquet文件。在这一点上,我收到了许多关于DataFrameWriter的类似Java的错误或内存不足异常。
我已经在Windows和Linux机器上尝试过该代码。我设法避免错误的唯一方法是增加机器中的--driver-memory值。最小值在每个平台上都不同,并且取决于问题的大小,这使我怀疑内存泄漏。
直到我开始使用pandas_udf时,问题才发生。我认为在使用pandas_udf进行的pyarrow序列化的整个过程中,可能在某处内存泄漏。
我创建了一个最小的可复制示例。如果我直接使用Python运行此脚本,则会产生错误。使用提交火花并增加大量驱动程序内存,可以使其正常工作。
import pyspark
import pyspark.sql.functions as F
import pyspark.sql.types as spktyp
# Dummy pandas_udf -------------------------------------------------------------
@F.pandas_udf(spktyp.DoubleType())
def predict(x):
return x + 100.0
# Initialization ---------------------------------------------------------------
spark = pyspark.sql.SparkSession.builder.appName(
"mre").master("local[3]").getOrCreate()
sc = spark.sparkContext
# Generate a dataframe ---------------------------------------------------------
out_path = "out.parquet"
z = 105
m = 750000
schema = spktyp.StructType(
[spktyp.StructField("ID", spktyp.DoubleType(), True)]
)
df = spark.createDataFrame(
[(float(i),) for i in range(m)],
schema
)
for j in range(z):
df = df.withColumn(
f"N{j}",
F.col("ID") + float(j)
) …Run Code Online (Sandbox Code Playgroud)