PySpark:执行程序映射分区功能中未释放numpy内存(内存泄漏)

jos*_*hlk 6 python memory-leaks numpy apache-spark pyspark

我有以下最小的工作示例:

from pyspark import SparkContext
from pyspark.sql import SQLContext
import numpy as np

sc = SparkContext()
sqlContext = SQLContext(sc)

# Create dummy pySpark DataFrame with 1e5 rows and 16 partitions
df = sqlContext.range(0, int(1e5), numPartitions=16)

def toy_example(rdd):

    # Read in pySpark DataFrame partition
    data = list(rdd)

    # Generate random data using Numpy
    rand_data = np.random.random(int(1e7))

    # Apply the `int` function to each element of `rand_data`
    for i in range(len(rand_data)):
        e = rand_data[i]
        int(e)

    # Return a single `0` value
    return [[0]]

# Execute the above function on each partition (16 partitions)
result = df.rdd.mapPartitions(toy_example)
result = result.collect()
Run Code Online (Sandbox Code Playgroud)

当上述运行时,执行程序的Python进程的内存在每次迭代后稳步增加,这表明上一迭代的内存没有释放-即内存泄漏。如果内存超出了执行程序的内存限制,则可能导致作业失败-参见下文:

在此处输入图片说明

奇怪的是,以下任何一项都可以防止内存泄漏:

  • 删除线 data = list(rdd)
  • rand_data = list(rand_data.tolist())后面插入行rand_data = np.random.random(int(1e7))
  • 删除线 int(e)

上面的代码是无法使用以上修复程序的大型项目的最小工作示例。

需要注意的一些事项:

  • 虽然rdd函数中未使用数据,但需要使用该行来重现泄漏。在实际项目中,将使用rdd数据。
  • 内存泄漏很可能是由于rand_data未释放大型Numpy阵列
  • 您必须int对的每个元素进行操作rand_data以重现泄漏

您可以rand_data通过在toy_example函数的前几行或后几行中插入代码来强制PySpark执行程序释放内存吗?

已经尝试过的

通过在函数末尾插入来强制垃圾回收:

del data, rand_data
import gc
gc.collect()
Run Code Online (Sandbox Code Playgroud)

通过在函数的结尾或开头插入来强制释放内存(受Pandas的启发):

from ctypes import cdll, CDLL
cdll.LoadLibrary("libc.so.6")
libc = CDLL("libc.so.6")
libc.malloc_trim(0)
Run Code Online (Sandbox Code Playgroud)

设置,测量和版本

以下PySpark作业在具有一个m4.xlarge工作节点的AWS EMR集群上运行。Numpy必须通过引导程序 pip安装在工作程序节点上。

使用以下功能测量了执行者的内存(打印到执行者的日志中):

import resource
resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
Run Code Online (Sandbox Code Playgroud)

Spark提交配置:

  • spark.executor.instances = 1
  • spark.executor.cores = 1
  • spark.executor.memory = 6克
  • spark.master =纱线
  • spark.dynamicAllocation.enabled = false

版本:

  • EMR 5.12.1
  • 火花2.2.1
  • Python 2.7.13
  • 脾气暴躁的1.14.0

小智 5

我们最近遇到了一个非常相似的问题,我们也无法通过更改代码来强制释放内存。然而,对我们有用的是使用以下 Spark 选项: spark.python.worker.reuse = False