在大型数据集上运行 Pandas UDF 时出现问题

nai*_*meh 7 python apache-spark pyspark pyarrow

我目前正在开发一个项目,但我很难理解 PySpark 中的 Pandas UDF 是如何工作的。

我有一个 Spark 集群,其中有一个 8 核、64GB 的主节点,以及两个各 16 核、112GB 的工作节点。我的数据集非常大,分为七个主要分区,每个分区包含约 78M 行。该数据集由 70 列组成。我定义了一个 Pandas UDF 来对数据集执行一些操作,这只能使用 Python 在 Pandas 数据帧上完成。

pandas UDF 是这样定义的:

@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def operation(pdf):
   #Some operations
   return pdf

spark.table("my_dataset").groupBy(partition_cols).apply(operation)
Run Code Online (Sandbox Code Playgroud)

绝对没有办法让 Pandas UDF 工作,因为它在执行操作之前就崩溃了。我怀疑某处存在 OOM 错误。上面的代码运行了几分钟,然后崩溃并显示错误代码,表明连接已重置。但是,如果我在一个分区上过滤后调用 .toPandas() 函数然后显示它,它运行正常,没有错误。该错误似乎仅在使用 PandasUDF 时发生。

我不明白它是如何工作的。Spark 是否尝试一次转换整个分区(78M 行)?如果是这样,它使用什么内存?驾驶员记忆?执行者的?如果它在驱动程序上,那么所有 Python 代码都在其上执行吗?

集群配置如下:

  • SPARK_WORKER_CORES=2
  • SPARK_WORKER_MEMORY=64g
  • Spark.executor.cores 2
  • Spark.executor.memory 30g(为Python实例提供内存)
  • 火花.驱动.内存 43g

我是否遗漏了某些内容,或者是否没有办法通过 PandasUDF 运行 78M 行?

use*_*362 5

Spark 是否尝试一次转换整个分区(78M 行)?

这正是发生的情况。Spark 3.0 添加了对分块 UDF 的支持,这些 UDF 对 PandasDataFrames或的迭代器进行操作Series,但如果对数据集进行操作,则只能使用 Python 在 Pandas 数据帧上完成,这些可能不是您的正确选择。

如果是这样,它使用什么内存?驾驶员记忆?执行者的?

每个分区都在各自的执行器上进行本地处理,并使用 Arrow 流式传输将数据传入和传出 Python 工作线程。

我是否遗漏了什么,或者是否没有办法通过 PandasUDF 运行 78M 行?

只要您有足够的内存来处理 Arrow 输入、输出(特别是复制数据时)、辅助数据结构以及 JVM 开销,它就应该可以很好地处理大型数据集。

但在这样小的集群上,您可以更好地对输出进行分区并直接使用 Pandas 读取数据,而无需使用 Spark。这样,您将能够使用所有可用资源(即 > 100GB/解释器)进行数据处理,而不是将这些资源浪费在辅助任务上(具有 16GB - 开销/解释器)。