使用pandas_udf和Parquet序列化时发生内存泄漏?

Fer*_*dez 10 python pandas pyspark pyspark-sql pyarrow

我目前正在使用PySpark开发我的第一个整个系统,并且遇到了一些奇怪的与内存相关的问题。在其中一个阶段中,我想类似于“拆分应用合并”策略以修改DataFrame。也就是说,我想对给定列定义的每个组应用一个函数,最后将它们全部合并。问题是,我要应用的函数是一种针对拟合模型的预测方法,该模型“说出”了熊猫的成语,即将其矢量化并以熊猫系列作为输入。

然后,我设计了一种迭代策略,遍历各个组并手动应用pandas_udf.Scalar来解决问题。组合部分使用对DataFrame.unionByName()的增量调用完成。我决定不使用pandas_udf的GroupedMap类型,因为文档指出该内存应由用户管理,并且只要其中一个组太大而无法保存在内存中或由一组表示,则应格外小心熊猫DataFrame。

主要问题是所有处理似乎都可以正常运行,但最后我想将最终的DataFrame序列化为Parquet文件。在这一点上,我收到了许多关于DataFrameWriter的类似Java的错误或内存不足异常。

我已经在Windows和Linux机器上尝试过该代码。我设法避免错误的唯一方法是增加机器中的--driver-memory值。最小值在每个平台上都不同,并且取决于问题的大小,这使我怀疑内存泄漏。

直到我开始使用pandas_udf时,问题才发生。我认为在使用pandas_udf进行的pyarrow序列化的整个过程中,可能在某处内存泄漏。

我创建了一个最小的可复制示例。如果我直接使用Python运行此脚本,则会产生错误。使用提交火花并增加大量驱动程序内存,可以使其正常工作。

import pyspark
import pyspark.sql.functions as F
import pyspark.sql.types as spktyp


# Dummy pandas_udf -------------------------------------------------------------
@F.pandas_udf(spktyp.DoubleType())
def predict(x):
    return x + 100.0


# Initialization ---------------------------------------------------------------
spark = pyspark.sql.SparkSession.builder.appName(
        "mre").master("local[3]").getOrCreate()

sc = spark.sparkContext

# Generate a dataframe ---------------------------------------------------------
out_path = "out.parquet"

z = 105
m = 750000

schema = spktyp.StructType(
    [spktyp.StructField("ID", spktyp.DoubleType(), True)]
)

df = spark.createDataFrame(
    [(float(i),) for i in range(m)],
    schema
)

for j in range(z):
    df = df.withColumn(
        f"N{j}",
        F.col("ID") + float(j)
    )

df = df.withColumn(
    "X",
    F.array(
        F.lit("A"),
        F.lit("B"),
        F.lit("C"),
        F.lit("D"),
        F.lit("E")
    ).getItem(
        (F.rand()*3).cast("int")
    )
)

# Set the column names for grouping, input and output --------------------------
group_col = "X"
in_col = "N0"
out_col = "EP"

# Extract different group ids in grouping variable -----------------------------
rows = df.select(group_col).distinct().collect()
groups = [row[group_col] for row in rows]
print(f"Groups: {groups}")

# Split and treat the first id -------------------------------------------------
first, *others = groups

cur_df = df.filter(F.col(group_col) == first)
result = cur_df.withColumn(
    out_col,
    predict(in_col)
)

# Traverse the remaining group ids ---------------------------------------------
for i, other in enumerate(others):
    cur_df = df.filter(F.col(group_col) == other)
    new_df = cur_df.withColumn(
        out_col,
        predict(in_col)
    )

    # Incremental union --------------------------------------------------------
    result = result.unionByName(new_df)

# Save to disk -----------------------------------------------------------------
result.write.mode("overwrite").parquet(out_path)
Run Code Online (Sandbox Code Playgroud)

令人震惊的(至少对我来说),如果我在序列化语句之前调用repartition(),问题似乎就消失了。

result = result.repartition(result.rdd.getNumPartitions())
result.write.mode("overwrite").parquet(out_path)
Run Code Online (Sandbox Code Playgroud)

放置此行之后,我可以降低很多驱动程序内存配置,并且脚本可以正常运行。尽管我怀疑对代码的懒惰评估和pyarrow序列化可能相关,但我几乎无法理解所有这些因素之间的关系。

这是我用于开发的当前环境:

arrow-cpp                 0.13.0           py36hee3af98_1    conda-forge
asn1crypto                0.24.0                py36_1003    conda-forge
astroid                   2.2.5                    py36_0
atomicwrites              1.3.0                      py_0    conda-forge
attrs                     19.1.0                     py_0    conda-forge
blas                      1.0                         mkl
boost-cpp                 1.68.0            h6a4c333_1000    conda-forge
brotli                    1.0.7             he025d50_1000    conda-forge
ca-certificates           2019.3.9             hecc5488_0    conda-forge
certifi                   2019.3.9                 py36_0    conda-forge
cffi                      1.12.3           py36hb32ad35_0    conda-forge
chardet                   3.0.4                 py36_1003    conda-forge
colorama                  0.4.1                    py36_0
cryptography              2.6.1            py36hb32ad35_0    conda-forge
dill                      0.2.9                    py36_0
docopt                    0.6.2                    py36_0
entrypoints               0.3                      py36_0
falcon                    1.4.1.post1     py36hfa6e2cd_1000    conda-forge
fastavro                  0.21.21          py36hfa6e2cd_0    conda-forge
flake8                    3.7.7                    py36_0
future                    0.17.1                py36_1000    conda-forge
gflags                    2.2.2                ha925a31_0
glog                      0.3.5                h6538335_1
hug                       2.5.2            py36hfa6e2cd_0    conda-forge
icc_rt                    2019.0.0             h0cc432a_1
idna                      2.8                   py36_1000    conda-forge
intel-openmp              2019.3                      203
isort                     4.3.17                   py36_0
lazy-object-proxy         1.3.1            py36hfa6e2cd_2
libboost                  1.67.0               hd9e427e_4
libprotobuf               3.7.1                h1a1b453_0    conda-forge
lz4-c                     1.8.1.2              h2fa13f4_0
mccabe                    0.6.1                    py36_1
mkl                       2018.0.3                      1
mkl_fft                   1.0.6            py36hdbbee80_0
mkl_random                1.0.1            py36h77b88f5_1
more-itertools            4.3.0                 py36_1000    conda-forge
ninabrlong                0.1.0                     dev_0    <develop>
nose                      1.3.7                 py36_1002    conda-forge
nose-exclude              0.5.0                      py_0    conda-forge
numpy                     1.15.0           py36h9fa60d3_0
numpy-base                1.15.0           py36h4a99626_0
openssl                   1.1.1b               hfa6e2cd_2    conda-forge
pandas                    0.23.3           py36h830ac7b_0
parquet-cpp               1.5.1                         2    conda-forge
pip                       19.0.3                   py36_0
pluggy                    0.11.0                     py_0    conda-forge
progressbar2              3.38.0                     py_1    conda-forge
py                        1.8.0                      py_0    conda-forge
py4j                      0.10.7                   py36_0
pyarrow                   0.13.0           py36h8c67754_0    conda-forge
pycodestyle               2.5.0                    py36_0
pycparser                 2.19                     py36_1    conda-forge
pyflakes                  2.1.1                    py36_0
pygam                     0.8.0                      py_0    conda-forge
pylint                    2.3.1                    py36_0
pyopenssl                 19.0.0                   py36_0    conda-forge
pyreadline                2.1                      py36_1
pysocks                   1.6.8                 py36_1002    conda-forge
pyspark                   2.4.1                      py_0
pytest                    4.5.0                    py36_0    conda-forge
pytest-runner             4.4                        py_0    conda-forge
python                    3.6.6                hea74fb7_0
python-dateutil           2.8.0                    py36_0
python-hdfs               2.3.1                      py_0    conda-forge
python-mimeparse          1.6.0                      py_1    conda-forge
python-utils              2.3.0                      py_1    conda-forge
pytz                      2019.1                     py_0
re2                       2019.04.01       vc14h6538335_0  [vc14]  conda-forge
requests                  2.21.0                py36_1000    conda-forge
requests-kerberos         0.12.0                   py36_0
scikit-learn              0.20.1           py36hb854c30_0
scipy                     1.1.0            py36hc28095f_0
setuptools                41.0.0                   py36_0
six                       1.12.0                   py36_0
snappy                    1.1.7                h777316e_3
sqlite                    3.28.0               he774522_0
thrift-cpp                0.12.0            h59828bf_1002    conda-forge
typed-ast                 1.3.1            py36he774522_0
urllib3                   1.24.2                   py36_0    conda-forge
vc                        14.1                 h0510ff6_4
vs2015_runtime            14.15.26706          h3a45250_0
wcwidth                   0.1.7                      py_1    conda-forge
wheel                     0.33.1                   py36_0
win_inet_pton             1.1.0                    py36_0    conda-forge
wincertstore              0.2              py36h7fe50ca_0
winkerberos               0.7.0                    py36_1
wrapt                     1.11.1           py36he774522_0
xz                        5.2.4                h2fa13f4_4
zlib                      1.2.11               h62dcd97_3
zstd                      1.3.3                hfe6a214_0
Run Code Online (Sandbox Code Playgroud)

任何提示或帮助将不胜感激。

小智 5

我想评论您的帖子,但是我的声誉太低。

根据我的经验,udf会大大降低您的性能,特别是如果您使用python(或pandas?)编写它们。有一篇文章,为什么你不应该使用python udfs而是使用scala udfs:https ://medium.com/wbaa/using-scala-udfs-in-pyspark-b70033dd69b9

以我为例,即使它非常复杂,也可以使用内置函数,并且运行时间比以前减少了大约5%。

对于您的OOM错误以及为什么分区适合您,我没有任何解释。我唯一可以给您的建议是尽可能避免UDF,尽管您的情况似乎并不那么容易。

  • 这篇文章非常有趣。谢谢你。问题是,我在这里尝试使用的是 pandas_udf,而不是普通的 UDF。我们需要它,因为我们想利用类似 sklearn 的模型的矢量化 predict() 方法。使用 UDF 意味着我们必须在每次观察时调用模型一次,我想避免这种情况。 (3认同)