Ras*_*åth 4 user-defined-functions apache-spark pyspark apache-arrow
我正在运行一项工作pyspark
,我曾在其中使用分组聚合 Pandas UDF。这会导致以下(此处为缩写)错误:
org.apache.arrow.vector.util.OversizedAllocationException: Unable to expand the buffer
Run Code Online (Sandbox Code Playgroud)
我相当确定这是因为 Pandas UDF 接收的组之一很大,如果我减少数据集并删除足够的行,我可以毫无问题地运行我的 UDF。但是,我想使用我的原始数据集运行,即使我在具有 192.0 GiB RAM 的机器上运行此 spark 作业,我仍然会遇到相同的错误。(并且 192.0 GiB 应该足以将整个数据集保存在内存中。)
我怎样才能给 spark 足够的内存来运行需要大量内存的分组聚合 Pandas UDF?
例如,是否有一些我错过的火花配置可以为 apache 箭头提供更多内存?
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
in
----> 1 device_attack_result.count()
2
3
4
/usr/lib/spark/python/pyspark/sql/dataframe.py in count(self)
520 2
521 """
--> 522 return int(self._jdf.count())
523
524 @ignore_unicode_prefix
/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args)
1255 answer = self.gateway_client.send_command(command)
1256 return_value = get_return_value(
-> 1257 answer, self.gateway_client, self.target_id, self.name)
1258
1259 for temp_arg in temp_args:
/usr/lib/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
61 def deco(*a, **kw):
62 try:
---> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
65 s = e.java_exception.toString()
/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
--> 328 format(target_id, ".", name), value)
329 else:
330 raise Py4JError(
Py4JJavaError: An error occurred while calling o818.count.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 102 in stage 27.0 failed 4 times, most recent failure: Lost task 102.3 in stage 27.0 (TID 3235, ip-172-31-111-163.ec2.internal, executor 1): org.apache.arrow.vector.util.OversizedAllocationException: Unable to expand the buffer
...
Run Code Online (Sandbox Code Playgroud)
Spark 的 PandasUDF 功能使用 Arrow 框架将 spark DataFrame 转换为 pandas DataFrame,此时 Arrow 内部缓冲区限制仅为 2GB,因此您的 pandasUDF group by condition 不应产生超过 2 GB 的未压缩数据。
df.groupby('id').apply(function)
Run Code Online (Sandbox Code Playgroud)
我的意思是
仅当您的组分区大小小于 2 GB 未压缩时,您才能运行您的 Pandas UDF 方法
这是供您参考的门票
https://issues.apache.org/jira/browse/ARROW-4890
上述问题似乎在 >= 0.15 版本的 pyarrow 中得到解决,只有 Spark 3.x 使用 pyarrow 0.15 版本
Arrow 0.16 已将最大缓冲区分配大小从 MaxInteger 更改为 MaxLong(64 位) https://issues.apache.org/jira/browse/ARROW-6112
截至 2020 年 7 月,上游 Spark 仍基于 Arrow 0.15 https://github.com/apache/spark/blob/master/python/setup.py
不过,Netty 后备缓冲区仍然不支持这一点。
所以到目前为止,由于上述限制,这仍然是不可能的。
这可能会在 Spark 端解决 https://issues.apache.org/jira/browse/SPARK-32294 这个想法是将 GroupedData 分批提供给 Pandas UDF 来解决这个问题。
更新:Databricks 平台上的 PySpark 没有这个问题。需要 DBR7.4+
--executor-memory
Spark-submit 选项设置为180g
,以便 Spark 利用所有可用内存? 归档时间: |
|
查看次数: |
1960 次 |
最近记录: |