Max*_*Max 82 python pool multiprocessing
是否有可能创建一个非守护进程的python池?我希望一个池能够调用一个内部有另一个池的函数.
我想要这个,因为deamon进程无法创建进程.具体来说,它会导致错误:
AssertionError: daemonic processes are not allowed to have children
Run Code Online (Sandbox Code Playgroud)
例如,考虑function_a具有运行的池的场景,该池具有运行function_b的池function_c.此函数链将失败,因为function_b正在守护进程中运行,并且守护进程无法创建进程.
Chr*_*ndt 105
在multiprocessing.pool.Pool类创建在其工作进程__init__的方法,使他们邪,开始他们,这是不可能自己重新设置daemon属性False在开始之前(事后这是不允许的了).但是你可以创建自己的子类multiprocesing.pool.Pool(multiprocessing.Pool只是一个包装器函数)并替换你自己的multiprocessing.Process子类,它总是非守护进程,用于工作进程.
以下是如何执行此操作的完整示例.最重要的部分是两个阶级NoDaemonProcess,并MyPool在顶部和调用pool.close(),并pool.join()在你的MyPool末实例.
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
import multiprocessing
# We must import this explicitly, it is not imported by the top-level
# multiprocessing module.
import multiprocessing.pool
import time
from random import randint
class NoDaemonProcess(multiprocessing.Process):
# make 'daemon' attribute always return False
def _get_daemon(self):
return False
def _set_daemon(self, value):
pass
daemon = property(_get_daemon, _set_daemon)
# We sub-class multiprocessing.pool.Pool instead of multiprocessing.Pool
# because the latter is only a wrapper function, not a proper class.
class MyPool(multiprocessing.pool.Pool):
Process = NoDaemonProcess
def sleepawhile(t):
print("Sleeping %i seconds..." % t)
time.sleep(t)
return t
def work(num_procs):
print("Creating %i (daemon) workers and jobs in child." % num_procs)
pool = multiprocessing.Pool(num_procs)
result = pool.map(sleepawhile,
[randint(1, 5) for x in range(num_procs)])
# The following is not really needed, since the (daemon) workers of the
# child's pool are killed when the child is terminated, but it's good
# practice to cleanup after ourselves anyway.
pool.close()
pool.join()
return result
def test():
print("Creating 5 (non-daemon) workers and jobs in main process.")
pool = MyPool(5)
result = pool.map(work, [randint(1, 5) for x in range(5)])
pool.close()
pool.join()
print(result)
if __name__ == '__main__':
test()
Run Code Online (Sandbox Code Playgroud)
Mas*_*ano 17
我必须在Python 3.7中使用非守护程序池,并最终改编了接受的答案中发布的代码。下面是创建非守护程序池的代码段:
class NoDaemonProcess(multiprocessing.Process):
@property
def daemon(self):
return False
@daemon.setter
def daemon(self, value):
pass
class NoDaemonContext(type(multiprocessing.get_context())):
Process = NoDaemonProcess
# We sub-class multiprocessing.pool.Pool instead of multiprocessing.Pool
# because the latter is only a wrapper function, not a proper class.
class MyPool(multiprocessing.pool.Pool):
def __init__(self, *args, **kwargs):
kwargs['context'] = NoDaemonContext()
super(MyPool, self).__init__(*args, **kwargs)
Run Code Online (Sandbox Code Playgroud)
由于的当前实现multiprocessing已被广泛地重构为基于上下文,因此我们需要提供一个NoDaemonContext具有NoDaemonProcessas属性的类。MyPool然后将使用该上下文,而不是默认上下文。
就是说,我应该警告这种方法至少有两个警告:
multiprocessing软件包的实现细节,因此可能随时中断。multiprocessing说得那么难用非恶魔的过程,其中有许多是解释在这里。我认为最引人注目的是:
至于允许子线程使用子进程来产生自己的子进程,如果父线程或子线程在子进程完成并返回之前终止,则可能会产生一些僵尸“孙子”。
Acu*_*nus 11
从 Python 3.8 开始,concurrent.futures.ProcessPoolExecutor没有这个限制。它可以有一个完全没有问题的嵌套进程池:
from concurrent.futures import ProcessPoolExecutor as Pool
from itertools import repeat
from multiprocessing import current_process
import time
def pid():
return current_process().pid
def _square(i): # Runs in inner_pool
square = i ** 2
time.sleep(i / 10)
print(f'{pid()=} {i=} {square=}')
return square
def _sum_squares(i, j): # Runs in outer_pool
with Pool(max_workers=2) as inner_pool:
squares = inner_pool.map(_square, (i, j))
sum_squares = sum(squares)
time.sleep(sum_squares ** .5)
print(f'{pid()=}, {i=}, {j=} {sum_squares=}')
return sum_squares
def main():
with Pool(max_workers=3) as outer_pool:
for sum_squares in outer_pool.map(_sum_squares, range(5), repeat(3)):
print(f'{pid()=} {sum_squares=}')
if __name__ == "__main__":
main()
Run Code Online (Sandbox Code Playgroud)
上面的演示代码是用 Python 3.8 测试的。
ProcessPoolExecutor但是, 的一个限制是它没有maxtasksperchild。如果您需要这个,请考虑Massimiliano的答案。
信用:jfs的回答
在某些 Python 版本中,将标准 Pool 替换为 custom 可能会引发错误:AssertionError: group argument must be None for now.
在这里,我找到了一个可以提供帮助的解决方案:
class NoDaemonProcess(multiprocessing.Process):
# make 'daemon' attribute always return False
@property
def daemon(self):
return False
@daemon.setter
def daemon(self, val):
pass
class NoDaemonProcessPool(multiprocessing.pool.Pool):
def Process(self, *args, **kwds):
proc = super(NoDaemonProcessPool, self).Process(*args, **kwds)
proc.__class__ = NoDaemonProcess
return proc
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
36547 次 |
| 最近记录: |