Python fork:如果进程消耗超过 50% 的可用性,则“无法分配内存”。记忆

Leo*_*Leo 5 python memory fork allocation popen

我在 Python 中 fork 进程时遇到了内存分配问题。我知道这里的其他一些帖子中已经讨论过这个问题,但是我在其中任何一个帖子中都找不到好的解决方案。

这是说明问题的示例脚本:

import os
import psutil
import subprocess
pid = os.getpid()
this_proc = psutil.Process(pid)
MAX_MEM = int(psutil.virtual_memory().free*1E-9) # in GB
def consume_memory(size):
    """ Size in GB """
    memory_consumer = []
    while get_mem_usage() < size:
        memory_consumer.append(" "*1000000) # Adding ~1MB
    return(memory_consumer)

def get_mem_usage():
    return(this_proc.memory_info()[0]/2.**30)

def get_free_mem():
    return(psutil.virtual_memory().free/2.**30)

if __name__ == "__main__":
    for i in range(1, MAX_MEM):
        consumer = consume_memory(i)
        mem_usage = get_mem_usage()
        print("\n## Memory usage %d/%d GB (%2d%%) ##" % (int(mem_usage), 
              MAX_MEM, int(mem_usage*100/MAX_MEM)))
        try:
            subprocess.call(['echo', '[OK] Fork worked.'])
        except OSError as e:
            print("[ERROR] Fork failed. Got OSError.")
            print(e)
        del consumer
Run Code Online (Sandbox Code Playgroud)

该脚本在 Arch Linux 上使用 Python 2.7 和 3.6 进行了测试,并使用 psutils 来跟踪内存使用情况。它逐渐增加 Python 进程的内存使用量,并尝试使用 subprocess.call() fork 进程。如果超过 50% 的可用性,分叉失败。内存被父进程消耗。

## Memory usage 1/19 GB ( 5%) ##
[OK] Fork worked.

## Memory usage 2/19 GB (10%) ##
[OK] Fork worked.

## Memory usage 3/19 GB (15%) ##
[OK] Fork worked.

[...]

## Memory usage 9/19 GB (47%) ##
[OK] Fork worked.

## Memory usage 10/19 GB (52%) ##
[ERROR] Fork failed. Got OSError.
[Errno 12] Cannot allocate memory

## Memory usage 11/19 GB (57%) ##
[ERROR] Fork failed. Got OSError.
[Errno 12] Cannot allocate memory

## Memory usage 12/19 GB (63%) ##
[ERROR] Fork failed. Got OSError.
[Errno 12] Cannot allocate memory

## Memory usage 13/19 GB (68%) ##
[ERROR] Fork failed. Got OSError.
[Errno 12] Cannot allocate memory

[...]
Run Code Online (Sandbox Code Playgroud)

请注意,我在运行此测试时没有激活 Swap。

解决这个问题似乎有两种选择:

  • 使用至少两倍于物理内存大小的 Swap。
  • 更改 overcommit_memory 设置: echo 1 > /proc/sys/vm/overcommit_memory

我在台式机上尝试了后者,上面的脚本没有错误地完成了。但是,在我正在处理的计算集群上,我无法使用这些选项中的任何一个。

不幸的是,在消耗内存之前提前分叉所需的进程也不是一种选择。

有人对如何解决这个问题有其他建议吗?

谢谢!

最好的事物

莱昂哈德

Ond*_* K. 2

The problem you are facing is not really Python related and also not something you could really do much to change with Python alone. Starting a forking process (executor) up front as suggested by mbrig in the comments really seems to be the best and cleanest option for this scenario.

Python or not, you are dealing with how Linux (or similar system) create new processes. Your parent process first calls fork(2) which creates a new child process as a copy of itself. It does not actually copy itself elsewhere at that time (it uses copy-on-write), nonetheless, it checks if sufficient space is available and if not fails setting errno to 12: ENOMEM -> the OSError exception you're seeing.

Yes, allowing VMS to overcommit memory can suppress this error popping up... and if you exec new program (which would also end up being smaller) in the child. It does not have to cause any immediate failures. But it sounds like possibly kicking the problem further down the road.

Growing memory (adding swap). Pushes the limit and as long twice your running process still fits into available memory, the fork could succeed. With the follow-up exec, the swap would not even need to get utilized.

There seems to be one more option, but it looks... dirty. There is another syscall vfork() which creates a new process which initially shares memory with its parent whose execution is suspended at that point. This newly created child process can only set variable returned by vfork, it can _exit or exec. As such, it is not exposed through any Python interface and if you tried (I did) loading it directly into Python using ctypes it would segfault (I presume because Python would still do something other then just those three actions mentioned after vfork and before I could exec something else in the child).

That said, you can delegate the whole vfork and exec to a shared object you load in. As a very rough proof of concept, I did just that:

#include <errno.h>
#include <stdio.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <unistd.h>

char run(char * const arg[]) {
    pid_t child;
    int wstatus;
    char ret_val = -1;

    child = vfork();
    if (child < 0) {
        printf("run: Failed to fork: %i\n", errno);
    } else if (child == 0) {
        printf("arg: %s\n", arg[0]);
        execv(arg[0], arg);
        _exit(-1);
    } else {
        child = waitpid(child, &wstatus, 0);
        if (WIFEXITED(wstatus))
            ret_val = WEXITSTATUS(wstatus);
    }
    return ret_val;
}
Run Code Online (Sandbox Code Playgroud)

And I've modified your sample code in the following way (bulk of the change is in and around replacement of subprocess.call):

import ctypes
import os
import psutil
pid = os.getpid()
this_proc = psutil.Process(pid)
MAX_MEM = int(psutil.virtual_memory().free*1E-9) # in GB
def consume_memory(size):
    """ Size in GB """
    memory_consumer = []
    while get_mem_usage() < size:
        memory_consumer.append(" "*1000000) # Adding ~1MB
    return(memory_consumer)

def get_mem_usage():
    return(this_proc.memory_info()[0]/2.**30)

def get_free_mem():
    return(psutil.virtual_memory().free/2.**30)

if __name__ == "__main__":
    forker = ctypes.CDLL("forker.so", use_errno=True)
    for i in range(1, MAX_MEM):
        consumer = consume_memory(i)
        mem_usage = get_mem_usage()
        print("\n## Memory usage %d/%d GB (%2d%%) ##" % (int(mem_usage), 
              MAX_MEM, int(mem_usage*100/MAX_MEM)))
        try:
            cmd = [b"/bin/echo", b"[OK] Fork worked."]
            c_cmd = (ctypes.c_char_p * (len(cmd) + 1))()
            c_cmd[:] = cmd + [None]
            ret = forker.run(c_cmd)
            errno = ctypes.get_errno()
            if errno:
                raise OSError(errno, os.strerror(errno))
        except OSError as e:
            print("[ERROR] Fork failed. Got OSError.")
            print(e)
        del consumer
Run Code Online (Sandbox Code Playgroud)

With that, I could still fork at 3/4 of available memory reported filled.

In theory it could all be written "properly" and also wrapped nicely to integrate with Python code well, but while it seems to be one additional option. I'd still go back to the executor process.


I've only briefly scanned through the concurrent.futures.process module, but once it spawns a worker process, it does not seem to clobber it before done, so perhaps abusing existing ProcessPoolExecutor would be a quick and cheap option. I've added these close to the script top (main part):

def nop():
    pass

executor = concurrent.futures.ProcessPoolExecutor(max_workers=1)
executor.submit(nop)  # start a worker process in the pool
Run Code Online (Sandbox Code Playgroud)

And then submit the subprocess.call to it:

proc = executor.submit(subprocess.call, ['echo', '[OK] Fork worked.'])
proc.result()  # can also collect the return value
Run Code Online (Sandbox Code Playgroud)