如何通过增加 spark 的内存来解决 pyspark `org.apache.arrow.vector.util.OversizedAllocationException` 错误?

Ras*_*åth 4 user-defined-functions apache-spark pyspark apache-arrow

我正在运行一项工作pyspark,我曾在其中使用分组聚合 Pandas UDF。这会导致以下(此处为缩写)错误:

org.apache.arrow.vector.util.OversizedAllocationException: Unable to expand the buffer
Run Code Online (Sandbox Code Playgroud)

我相当确定这是因为 Pandas UDF 接收的组之一很大,如果我减少数据集并删除足够的行,我可以毫无问题地运行我的 UDF。但是,我想使用我的原始数据集运行,即使我在具有 192.0 GiB RAM 的机器上运行此 spark 作业,我仍然会遇到相同的错误。(并且 192.0 GiB 应该足以将整个数据集保存在内存中。)

我怎样才能给 spark 足够的内存来运行需要大量内存的分组聚合 Pandas UDF?

例如,是否有一些我错过的火花配置可以为 apache 箭头提供更多内存?

更长的错误信息

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
 in 
----> 1 device_attack_result.count()
      2 
      3 
      4 

/usr/lib/spark/python/pyspark/sql/dataframe.py in count(self)
    520         2
    521         """
--> 522         return int(self._jdf.count())
    523 
    524     @ignore_unicode_prefix

/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1255         answer = self.gateway_client.send_command(command)
   1256         return_value = get_return_value(
-> 1257             answer, self.gateway_client, self.target_id, self.name)
   1258 
   1259         for temp_arg in temp_args:

/usr/lib/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
     61     def deco(*a, **kw):
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:
     65             s = e.java_exception.toString()

/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
--> 328                     format(target_id, ".", name), value)
    329             else:
    330                 raise Py4JError(

Py4JJavaError: An error occurred while calling o818.count.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 102 in stage 27.0 failed 4 times, most recent failure: Lost task 102.3 in stage 27.0 (TID 3235, ip-172-31-111-163.ec2.internal, executor 1): org.apache.arrow.vector.util.OversizedAllocationException: Unable to expand the buffer
...
Run Code Online (Sandbox Code Playgroud)

完整的错误信息在这里。

Sur*_*nti 6

Spark 的 PandasUDF 功能使用 Arrow 框架将 spark DataFrame 转换为 pandas DataFrame,此时 Arrow 内部缓冲区限制仅为 2GB,因此您的 pandasUDF group by condition 不应产生超过 2 GB 的未压缩数据。

df.groupby('id').apply(function)
Run Code Online (Sandbox Code Playgroud)

我的意思是

仅当您的组分区大小小于 2 GB 未压缩时,您才能运行您的 Pandas UDF 方法

这是供您参考的门票

https://issues.apache.org/jira/browse/ARROW-4890

上述问题似乎在 >= 0.15 版本的 pyarrow 中得到解决,只有 Spark 3.x 使用 pyarrow 0.15 版本


Tag*_*gar 5

Arrow 0.16 已将最大缓冲区分配大小从 MaxInteger 更改为 MaxLong(64 位) https://issues.apache.org/jira/browse/ARROW-6112

截至 2020 年 7 月,上游 Spark 仍基于 Arrow 0.15 https://github.com/apache/spark/blob/master/python/setup.py

不过,Netty 后备缓冲区仍然不支持这一点。

所以到目前为止,由于上述限制,这仍然是不可能的。

这可能会在 Spark 端解决 https://issues.apache.org/jira/browse/SPARK-32294 这个想法是将 GroupedData 分批提供给 Pandas UDF 来解决这个问题。

更新:Databricks 平台上的 PySpark 没有这个问题。需要 DBR7.4+


Art*_*sia 4

  1. 您是否尝试过将--executor-memorySpark-submit 选项设置为180g,以便 Spark 利用所有可用内存?
  2. 实际上,Spark 看起来并不存在 OOMing 或典型的数据倾斜问题。当您的数据结构之一遇到 Apache Arrow 内部限制时,这看起来是一种相当奇怪的情况 - 没有缓冲区的大小不能大于 Integer.MAX_VALUE 字节:https: //github.com/apache/arrow/blob/157b179812adb8f29e5966682ff1937f85ce192a/java/矢量/src/main/java/org/apache/arrow/vector/BaseValueVector.java#L42。我不确定 Arrow 是如何工作的,但对我来说,您的一个数据点似乎包含超过 4Gbs 的数据