我目前正在开发一个项目,但我很难理解 PySpark 中的 Pandas UDF 是如何工作的。
我有一个 Spark 集群,其中有一个 8 核、64GB 的主节点,以及两个各 16 核、112GB 的工作节点。我的数据集非常大,分为七个主要分区,每个分区包含约 78M 行。该数据集由 70 列组成。我定义了一个 Pandas UDF 来对数据集执行一些操作,这只能使用 Python 在 Pandas 数据帧上完成。
pandas UDF 是这样定义的:
@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def operation(pdf):
#Some operations
return pdf
spark.table("my_dataset").groupBy(partition_cols).apply(operation)
Run Code Online (Sandbox Code Playgroud)
绝对没有办法让 Pandas UDF 工作,因为它在执行操作之前就崩溃了。我怀疑某处存在 OOM 错误。上面的代码运行了几分钟,然后崩溃并显示错误代码,表明连接已重置。但是,如果我在一个分区上过滤后调用 .toPandas() 函数然后显示它,它运行正常,没有错误。该错误似乎仅在使用 PandasUDF 时发生。
我不明白它是如何工作的。Spark 是否尝试一次转换整个分区(78M 行)?如果是这样,它使用什么内存?驾驶员记忆?执行者的?如果它在驱动程序上,那么所有 Python 代码都在其上执行吗?
集群配置如下:
我是否遗漏了某些内容,或者是否没有办法通过 PandasUDF 运行 78M 行?