小编nai*_*meh的帖子

在大型数据集上运行 Pandas UDF 时出现问题

我目前正在开发一个项目,但我很难理解 PySpark 中的 Pandas UDF 是如何工作的。

我有一个 Spark 集群,其中有一个 8 核、64GB 的主节点,以及两个各 16 核、112GB 的工作节点。我的数据集非常大,分为七个主要分区,每个分区包含约 78M 行。该数据集由 70 列组成。我定义了一个 Pandas UDF 来对数据集执行一些操作,这只能使用 Python 在 Pandas 数据帧上完成。

pandas UDF 是这样定义的:

@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def operation(pdf):
   #Some operations
   return pdf

spark.table("my_dataset").groupBy(partition_cols).apply(operation)
Run Code Online (Sandbox Code Playgroud)

绝对没有办法让 Pandas UDF 工作,因为它在执行操作之前就崩溃了。我怀疑某处存在 OOM 错误。上面的代码运行了几分钟,然后崩溃并显示错误代码,表明连接已重置。但是,如果我在一个分区上过滤后调用 .toPandas() 函数然后显示它,它运行正常,没有错误。该错误似乎仅在使用 PandasUDF 时发生。

我不明白它是如何工作的。Spark 是否尝试一次转换整个分区(78M 行)?如果是这样,它使用什么内存?驾驶员记忆?执行者的?如果它在驱动程序上,那么所有 Python 代码都在其上执行吗?

集群配置如下:

  • SPARK_WORKER_CORES=2
  • SPARK_WORKER_MEMORY=64g
  • Spark.executor.cores 2
  • Spark.executor.memory 30g(为Python实例提供内存)
  • 火花.驱动.内存 43g

我是否遗漏了某些内容,或者是否没有办法通过 PandasUDF 运行 78M 行?

python apache-spark pyspark pyarrow

7
推荐指数
1
解决办法
5118
查看次数

标签 统计

apache-spark ×1

pyarrow ×1

pyspark ×1

python ×1