Bas*_*ing 2 python windows cython joblib dask
我正在使用 dask 或 joblib 将一些串行处理的 python 作业转换为多处理。可悲的是,我需要在 Windows 上工作。
当从 IPython 内运行或从命令行使用 python 调用 py 文件时,一切运行正常。
使用 cython 编译可执行文件时,它不再正常运行:越来越多的进程(无限制且大于请求的进程数)开始启动并阻塞我的系统。
它以某种方式感觉像多处理炸弹- 但当然我曾经if __name__=="__main__:"拥有控制块 - 通过在命令行中从 python 调用中正常运行获得批准。
我的 cython 调用是cython --embed --verbose --annotate THECODE.PY,我正在编译gcc -time -municode -DMS_WIN64 -mthreads -Wall -O -I"PATH_TO_\include" -L"PATH_TO_\libs" THECODE.c -lpython36 -o THECODE生成一个 windows 可执行文件THECODE.exe。
其他(单处理)代码运行良好。
dask 和 joblib 的问题似乎相同(这可能意味着 dask 的工作方式类似于或基于 joblib)。
有什么建议?
对于那些对mcve感兴趣的人:只需从Multiprocessing Bomb 中获取第一个代码并使用上面的 cython 命令编译它就会导致一个可执行文件炸毁你的系统。(我刚试过:-))
通过在代码示例中添加一行以显示以下内容,我发现了一些有趣的东西__name__:
import multiprocessing
def worker():
"""worker function"""
print('Worker')
return
print("-->" + __name__ + "<--")
if __name__ == '__main__':
jobs = []
for i in range(5):
p = multiprocessing.Process(target=worker)
jobs.append(p)
p.start()
Run Code Online (Sandbox Code Playgroud)
当用python它运行那段代码时显示
__main__
__mp_main__
__mp_main__
__mp_main__
__mp_main__
__mp_main__
Run Code Online (Sandbox Code Playgroud)
(其他输出被抑制)。解释 if 决定有效。在显示 cython 和编译后运行可执行文件时
__main__
__main__
__main__
__main__
__main__
__main__
Run Code Online (Sandbox Code Playgroud)
而且越来越多。因此,工作人员对模块的调用不再masqueraded像导入一样,因此每个工作人员都尝试以递归方式启动五个新的工作人员。
在 Windows 上启动新的 python-process multiprocessing-module 时使用spawn-method(也可以在 Linux 上使用mp.set_start_method('spawn').
命令行参数传递给新进程中的解释器,因此可以建立与父进程的通信,例如:
python -c "from multiprocessing.spawn import spawn_main; spawn_main(tracker_fd=5, pipe_handle=11)" --multiprocessing-fork
Run Code Online (Sandbox Code Playgroud)
嵌入式 cython 模块(或冻结(即使用 cx_Freeze、py2exe 和类似模块创建)模块的问题),将命令行参数传递给它们更多地对应于
python my_script.py <arguments>
Run Code Online (Sandbox Code Playgroud)
即命令行不会被 interpeter 自动处理,而是需要在脚本中处理。
multiprocessing提供了一个名为 的函数multiprocessing.freeze_support(),它可以正确处理命令行参数,并且可以如Bastian 的回答所示使用:
if __name__ == '__main__':
# needed for Cython, as it doesn't set `frozen`-attribute
setattr(sys, 'frozen', True)
# parse command line options and execute it if needed
multiprocessing.freeze_support()
Run Code Online (Sandbox Code Playgroud)
但是,此解决方案仅适用于 Windows,如代码所示:
def freeze_support(self):
'''Check whether this is a fake forked process in a frozen executable.
If so then run code specified by commandline and exit.
'''
if sys.platform == 'win32' and getattr(sys, 'frozen', False):
from .spawn import freeze_support
freeze_support()
Run Code Online (Sandbox Code Playgroud)
有一个错误报告:在 win32 之外需要 multiprocessing freeze_support这可能/可能不会很快修复。
正如在上面的错误报告中所解释的,仅仅设置frozen属性True并freeze_support直接从调用是不够的,multiprocessing.spawn因为信号量跟踪器没有正确处理。
我看到有两种选择:要么使用上述错误报告中尚未发布的补丁修补您的安装,要么使用下面介绍的自己动手做的方法。
这是这个答案的早期版本,它更“实验性”,但提供了更多的见解/细节,并以有点自己动手的方式提出了一个解决方案。
我在 linux 上,所以我mp.set_start_method('spawn')用来模拟 windows 的行为。
在spawn-mode 中会发生什么?让我们添加一些sleeps,以便我们可以调查过程:
#bomb.py
import multiprocessing as mp
import sys
import time
def worker():
time.sleep(50)
print('Worker')
return
if __name__ == '__main__':
print("Starting...")
time.sleep(20)
mp.set_start_method('spawn') ## use spawn!
jobs = []
for i in range(5):
p = mp.Process(target=worker)
jobs.append(p)
p.start()
Run Code Online (Sandbox Code Playgroud)
通过使用pgrep python我们可以看到,一开始只有一个python进程,然后是7(!)个不同的pids。我们可以通过cat /proc/<pid>/cmdline. 5 个新进程有命令行
-c "from multiprocessing.spawn import spawn_main; spawn_main(tracker_fd=5, pipe_handle=11)" --multiprocessing-fork
Run Code Online (Sandbox Code Playgroud)
和一:
-c "from multiprocessing.semaphore_tracker import main;main(4)"
Run Code Online (Sandbox Code Playgroud)
这意味着,父进程启动了 6 个新的 Python 解释器实例,每个新启动的解释器都执行通过命令行选项从父进程发送的代码,信息通过管道共享。这 6 个 python 实例之一是跟踪器,它观察整个事情。
好的,如果 cythonized+embeded 会发生什么?和普通的python一样,唯一的区别bomb是启动了-executable而不是python。但与 python 解释器不同的是,它不执行/不知道命令行参数,因此该main函数一遍又一遍地运行。
有一个简单的解决方法:让bomb-exe 启动 python 解释器
...
if __name__ == '__main__':
mp.set_executable(<PATH TO PYTHON>)
....
Run Code Online (Sandbox Code Playgroud)
现在bomb不再是多处理炸弹!
然而,目标可能不是有一个 python 解释器,所以我们需要让我们的程序知道可能的命令行:
import re
......
if __name__ == '__main__':
if len(sys.argv)==3: # should start in semaphore_tracker mode
nr=list(map(int, re.findall(r'\d+',sys.argv[2])))
sys.argv[1]='--multiprocessing-fork' # this canary is needed for multiprocessing module to work
from multiprocessing.semaphore_tracker import main;main(nr[0])
elif len(sys.argv)>3: # should start in slave mode
fd, pipe=map(int, re.findall(r'\d+',sys.argv[2]))
print("I'm a slave!, fd=%d, pipe=%d"%(fd,pipe))
sys.argv[1]='--multiprocessing-fork' # this canary is needed for multiprocessing module to work
from multiprocessing.spawn import spawn_main;
spawn_main(tracker_fd=fd, pipe_handle=pipe)
else: #main mode
print("Starting...")
mp.set_start_method('spawn')
jobs = []
for i in range(5):
p = mp.Process(target=worker)
jobs.append(p)
p.start()
Run Code Online (Sandbox Code Playgroud)
现在,我们的炸弹不需要独立的 python 解释器,并且在工作完成后停止。请注意以下事项:
bomb应该在哪种模式下启动不是很安全,但我希望你明白要点--multiprocessing-fork只是一只金丝雀,它不做任何事情,它只是必须在那里,请参见此处。注意:更改后的代码也可以与python一起使用,因为执行"from multiprocessing.spawn import spawn_main; spawn_main(tracker_fd=5, pipe_handle=11)" --multiprocessing-forkpython后更改了sys.argv所以代码不再看到原始命令行并且len(sys.argv)是1.
| 归档时间: |
|
| 查看次数: |
1822 次 |
| 最近记录: |