jos*_*hlk 6 python memory-leaks numpy apache-spark pyspark
我有以下最小的工作示例:
from pyspark import SparkContext
from pyspark.sql import SQLContext
import numpy as np
sc = SparkContext()
sqlContext = SQLContext(sc)
# Create dummy pySpark DataFrame with 1e5 rows and 16 partitions
df = sqlContext.range(0, int(1e5), numPartitions=16)
def toy_example(rdd):
# Read in pySpark DataFrame partition
data = list(rdd)
# Generate random data using Numpy
rand_data = np.random.random(int(1e7))
# Apply the `int` function to each element of `rand_data`
for i in range(len(rand_data)):
e = rand_data[i]
int(e)
# Return a single `0` value
return [[0]]
# Execute the above function on each partition (16 partitions)
result = df.rdd.mapPartitions(toy_example)
result = result.collect()
Run Code Online (Sandbox Code Playgroud)
当上述运行时,执行程序的Python进程的内存在每次迭代后稳步增加,这表明上一迭代的内存没有释放-即内存泄漏。如果内存超出了执行程序的内存限制,则可能导致作业失败-参见下文:
奇怪的是,以下任何一项都可以防止内存泄漏:
data = list(rdd)rand_data = list(rand_data.tolist())后面插入行rand_data = np.random.random(int(1e7))int(e)上面的代码是无法使用以上修复程序的大型项目的最小工作示例。
需要注意的一些事项:
rdd函数中未使用数据,但需要使用该行来重现泄漏。在实际项目中,将使用rdd数据。rand_data未释放大型Numpy阵列int对的每个元素进行操作rand_data以重现泄漏您可以rand_data通过在toy_example函数的前几行或后几行中插入代码来强制PySpark执行程序释放内存吗?
通过在函数末尾插入来强制垃圾回收:
del data, rand_data
import gc
gc.collect()
Run Code Online (Sandbox Code Playgroud)
通过在函数的结尾或开头插入来强制释放内存(受Pandas的启发):
from ctypes import cdll, CDLL
cdll.LoadLibrary("libc.so.6")
libc = CDLL("libc.so.6")
libc.malloc_trim(0)
Run Code Online (Sandbox Code Playgroud)
以下PySpark作业在具有一个m4.xlarge工作节点的AWS EMR集群上运行。Numpy必须通过引导程序 pip安装在工作程序节点上。
使用以下功能测量了执行者的内存(打印到执行者的日志中):
import resource
resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
Run Code Online (Sandbox Code Playgroud)
Spark提交配置:
版本: