nai*_*meh 7 python apache-spark pyspark pyarrow
我目前正在开发一个项目,但我很难理解 PySpark 中的 Pandas UDF 是如何工作的。
我有一个 Spark 集群,其中有一个 8 核、64GB 的主节点,以及两个各 16 核、112GB 的工作节点。我的数据集非常大,分为七个主要分区,每个分区包含约 78M 行。该数据集由 70 列组成。我定义了一个 Pandas UDF 来对数据集执行一些操作,这只能使用 Python 在 Pandas 数据帧上完成。
pandas UDF 是这样定义的:
@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def operation(pdf):
#Some operations
return pdf
spark.table("my_dataset").groupBy(partition_cols).apply(operation)
Run Code Online (Sandbox Code Playgroud)
绝对没有办法让 Pandas UDF 工作,因为它在执行操作之前就崩溃了。我怀疑某处存在 OOM 错误。上面的代码运行了几分钟,然后崩溃并显示错误代码,表明连接已重置。但是,如果我在一个分区上过滤后调用 .toPandas() 函数然后显示它,它运行正常,没有错误。该错误似乎仅在使用 PandasUDF 时发生。
我不明白它是如何工作的。Spark 是否尝试一次转换整个分区(78M 行)?如果是这样,它使用什么内存?驾驶员记忆?执行者的?如果它在驱动程序上,那么所有 Python 代码都在其上执行吗?
集群配置如下:
我是否遗漏了某些内容,或者是否没有办法通过 PandasUDF 运行 78M 行?
Spark 是否尝试一次转换整个分区(78M 行)?
这正是发生的情况。Spark 3.0 添加了对分块 UDF 的支持,这些 UDF 对 PandasDataFrames或的迭代器进行操作Series,但如果对数据集进行操作,则只能使用 Python 在 Pandas 数据帧上完成,这些可能不是您的正确选择。
如果是这样,它使用什么内存?驾驶员记忆?执行者的?
每个分区都在各自的执行器上进行本地处理,并使用 Arrow 流式传输将数据传入和传出 Python 工作线程。
我是否遗漏了什么,或者是否没有办法通过 PandasUDF 运行 78M 行?
只要您有足够的内存来处理 Arrow 输入、输出(特别是复制数据时)、辅助数据结构以及 JVM 开销,它就应该可以很好地处理大型数据集。
但在这样小的集群上,您可以更好地对输出进行分区并直接使用 Pandas 读取数据,而无需使用 Spark。这样,您将能够使用所有可用资源(即 > 100GB/解释器)进行数据处理,而不是将这些资源浪费在辅助任务上(具有 16GB - 开销/解释器)。