使用 dask 或 joblib multiprocessing 编译可执行文件会导致错误

Bas*_*ing 2 python windows cython joblib dask

我正在使用 dask 或 joblib 将一些串行处理的 python 作业转换为多处理。可悲的是,我需要在 Windows 上工作。
当从 IPython 内运行或从命令行使用 python 调用 py 文件时,一切运行正常。
使用 cython 编译可执行文件时,它不再正常运行:越来越多的进程(无限制且大于请求的进程数)开始启动并阻塞我的系统。
它以某种方式感觉像多处理炸弹- 但当然我曾经if __name__=="__main__:"拥有控制块 - 通过在命令行中从 python 调用中正常运行获得批准。
我的 cython 调用是cython --embed --verbose --annotate THECODE.PY,我正在编译gcc -time -municode -DMS_WIN64 -mthreads -Wall -O -I"PATH_TO_\include" -L"PATH_TO_\libs" THECODE.c -lpython36 -o THECODE生成一个 windows 可执行文件THECODE.exe
其他(单处理)代码运行良好。
dask 和 joblib 的问题似乎相同(这可能意味着 dask 的工作方式类似于或基于 joblib)。
有什么建议?

对于那些对mcve感兴趣的:只需从Multiprocessing Bomb 中获取第一个代码并使用上面的 cython 命令编译它就会导致一个可执行文件炸毁你的系统。(我刚试过:-))

通过在代码示例中添加一行以显示以下内容,我发现了一些有趣的东西__name__

import multiprocessing

def worker():
    """worker function"""
    print('Worker')
    return

print("-->" + __name__ + "<--")
if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = multiprocessing.Process(target=worker)
        jobs.append(p)
        p.start()
Run Code Online (Sandbox Code Playgroud)

当用python它运行那段代码时显示

__main__
__mp_main__
__mp_main__
__mp_main__
__mp_main__
__mp_main__
Run Code Online (Sandbox Code Playgroud)

(其他输出被抑制)。解释 if 决定有效。在显示 cython 和编译后运行可执行文件时

__main__
__main__
__main__
__main__
__main__
__main__
Run Code Online (Sandbox Code Playgroud)

而且越来越多。因此,工作人员对模块的调用不再masqueraded像导入一样,因此每个工作人员都尝试以递归方式启动五个新的工作人员。

ead*_*ead 6

在 Windows 上启动新的 python-process multiprocessing-module 时使用spawn-method(也可以在 Linux 上使用mp.set_start_method('spawn').

命令行参数传递给新进程中的解释器,因此可以建立与父进程的通信,例如:

 python -c "from multiprocessing.spawn import spawn_main; spawn_main(tracker_fd=5, pipe_handle=11)" --multiprocessing-fork
Run Code Online (Sandbox Code Playgroud)

嵌入式 cython 模块(或冻结(即使用 cx_Freeze、py2exe 和类似模块创建)模块的问题),将命令行参数传递给它们更多地对应于

python my_script.py <arguments>
Run Code Online (Sandbox Code Playgroud)

即命令行不会被 interpeter 自动处理,而是需要在脚本中处理。

multiprocessing提供了一个名为 的函数multiprocessing.freeze_support(),它可以正确处理命令行参数,并且可以如Bastian 的回答所示使用:

if __name__ == '__main__':
    # needed for Cython, as it doesn't set `frozen`-attribute
    setattr(sys, 'frozen', True) 
    # parse command line options and execute it if needed
    multiprocessing.freeze_support()
Run Code Online (Sandbox Code Playgroud)

但是,此解决方案仅适用于 Windows,如代码所示:

def freeze_support(self):
    '''Check whether this is a fake forked process in a frozen executable.
    If so then run code specified by commandline and exit.
    '''
    if sys.platform == 'win32' and getattr(sys, 'frozen', False):
        from .spawn import freeze_support
        freeze_support()
Run Code Online (Sandbox Code Playgroud)

有一个错误报告:在 win32 之外需要 multiprocessing freeze_support这可能/可能不会很快修复。

正如在上面的错误报告中所解释的,仅仅设置frozen属性Truefreeze_support直接从调用是不够的,multiprocessing.spawn因为信号量跟踪器没有正确处理。

我看到有两种选择:要么使用上述错误报告中尚未发布的补丁修补您的安装,要么使用下面介绍的自己动手做的方法。


这是这个答案的早期版本,它更“实验性”,但提供了更多的见解/细节,并以有点自己动手的方式提出了一个解决方案。

我在 linux 上,所以我mp.set_start_method('spawn')用来模拟 windows 的行为。

spawn-mode 中会发生什么?让我们添加一些sleeps,以便我们可以调查过程:

#bomb.py
import multiprocessing as mp
import sys
import time

def worker():
    time.sleep(50)
    print('Worker')
    return

if __name__ == '__main__':
        print("Starting...")
        time.sleep(20)
        mp.set_start_method('spawn') ## use spawn!
        jobs = []
        for i in range(5):
            p = mp.Process(target=worker)
            jobs.append(p)
            p.start()
Run Code Online (Sandbox Code Playgroud)

通过使用pgrep python我们可以看到,一开始只有一个python进程,然后是7(!)个不同的pids。我们可以通过cat /proc/<pid>/cmdline. 5 个新进程有命令行

-c "from multiprocessing.spawn import spawn_main; spawn_main(tracker_fd=5, pipe_handle=11)" --multiprocessing-fork
Run Code Online (Sandbox Code Playgroud)

和一:

-c "from multiprocessing.semaphore_tracker import main;main(4)"
Run Code Online (Sandbox Code Playgroud)

这意味着,父进程启动了 6 个新的 Python 解释器实例,每个新启动的解释器都执行通过命令行选项从父进程发送的代码,信息通过管道共享。这 6 个 python 实例之一是跟踪器,它观察整个事情。

好的,如果 cythonized+embeded 会发生什么?和普通的python一样,唯一的区别bomb是启动了-executable而不是python。但与 python 解释器不同的是,它不执行/不知道命令行参数,因此该main函数一遍又一遍地运行。

有一个简单的解决方法:让bomb-exe 启动 python 解释器

 ...
 if __name__ == '__main__':
    mp.set_executable(<PATH TO PYTHON>)
 ....
Run Code Online (Sandbox Code Playgroud)

现在bomb不再是多处理炸弹!

然而,目标可能不是有一个 python 解释器,所以我们需要让我们的程序知道可能的命令行:

import re
......
if __name__ == '__main__':
    if len(sys.argv)==3:  # should start in semaphore_tracker mode
        nr=list(map(int, re.findall(r'\d+',sys.argv[2])))          
        sys.argv[1]='--multiprocessing-fork'   # this canary is needed for multiprocessing module to work   
        from multiprocessing.semaphore_tracker import main;main(nr[0])

    elif len(sys.argv)>3: # should start in slave mode
        fd, pipe=map(int, re.findall(r'\d+',sys.argv[2]))
        print("I'm a slave!, fd=%d, pipe=%d"%(fd,pipe)) 
        sys.argv[1]='--multiprocessing-fork'   # this canary is needed for multiprocessing module to work  
        from multiprocessing.spawn import spawn_main; 
        spawn_main(tracker_fd=fd, pipe_handle=pipe)

    else: #main mode
        print("Starting...")
        mp.set_start_method('spawn')
        jobs = []
        for i in range(5):
            p = mp.Process(target=worker)
            jobs.append(p)
            p.start()
Run Code Online (Sandbox Code Playgroud)

现在,我们的炸弹不需要独立的 python 解释器,并且在工作完成后停止。请注意以下事项:

  1. 它的决定方式,bomb应该在哪种模式下启动不是很安全,但我希望你明白要点
  2. --multiprocessing-fork只是一只金丝雀,它不做任何事情,它只是必须在那里,请参见此处

注意:更改后的代码也可以与python一起使用,因为执行"from multiprocessing.spawn import spawn_main; spawn_main(tracker_fd=5, pipe_handle=11)" --multiprocessing-forkpython后更改了sys.argv所以代码不再看到原始命令行并且len(sys.argv)1.