Python进程池非守护进程?

Max*_*Max 82 python pool multiprocessing

是否有可能创建一个非守护进程的python池?我希望一个池能够调用一个内部有另一个池的函数.

我想要这个,因为deamon进程无法创建进程.具体来说,它会导致错误:

AssertionError: daemonic processes are not allowed to have children
Run Code Online (Sandbox Code Playgroud)

例如,考虑function_a具有运行的池的场景,该池具有运行function_b的池function_c.此函数链将失败,因为function_b正在守护进程中运行,并且守护进程无法创建进程.

Chr*_*ndt 105

multiprocessing.pool.Pool类创建在其工作进程__init__的方法,使他们邪,开始他们,这是不可能自己重新设置daemon属性False在开始之前(事后这是不允许的了).但是你可以创建自己的子类multiprocesing.pool.Pool(multiprocessing.Pool只是一个包装器函数)并替换你自己的multiprocessing.Process子类,它总是非守护进程,用于工作进程.

以下是如何执行此操作的完整示例.最重要的部分是两个阶级NoDaemonProcess,并MyPool在顶部和调用pool.close(),并pool.join()在你的MyPool末实例.

#!/usr/bin/env python
# -*- coding: UTF-8 -*-

import multiprocessing
# We must import this explicitly, it is not imported by the top-level
# multiprocessing module.
import multiprocessing.pool
import time

from random import randint


class NoDaemonProcess(multiprocessing.Process):
    # make 'daemon' attribute always return False
    def _get_daemon(self):
        return False
    def _set_daemon(self, value):
        pass
    daemon = property(_get_daemon, _set_daemon)

# We sub-class multiprocessing.pool.Pool instead of multiprocessing.Pool
# because the latter is only a wrapper function, not a proper class.
class MyPool(multiprocessing.pool.Pool):
    Process = NoDaemonProcess

def sleepawhile(t):
    print("Sleeping %i seconds..." % t)
    time.sleep(t)
    return t

def work(num_procs):
    print("Creating %i (daemon) workers and jobs in child." % num_procs)
    pool = multiprocessing.Pool(num_procs)

    result = pool.map(sleepawhile,
        [randint(1, 5) for x in range(num_procs)])

    # The following is not really needed, since the (daemon) workers of the
    # child's pool are killed when the child is terminated, but it's good
    # practice to cleanup after ourselves anyway.
    pool.close()
    pool.join()
    return result

def test():
    print("Creating 5 (non-daemon) workers and jobs in main process.")
    pool = MyPool(5)

    result = pool.map(work, [randint(1, 5) for x in range(5)])

    pool.close()
    pool.join()
    print(result)

if __name__ == '__main__':
    test()
Run Code Online (Sandbox Code Playgroud)

  • 使用"MyPool"而不是默认的"Pool"有什么缺点?换句话说,为了换取启动子流程的灵活性,我需要支付多少费用?(如果没有成本,可能是标准的"池"将使用非守护进程). (27认同)
  • @machen是的,不幸的是这是真的.在Python 3.6中,`Pool`类已被广泛重构,因此`Process`不再是一个简单的属性,而是一个方法,它返回从*context*获得的流程实例.我尝试覆盖此方法以返回一个`NoDaemonPool`实例,但是这会导致异常`AssertionError:当使用Pool时,不允许守护进程具有子进程. (4认同)
  • 辛苦了 如果有人因此而发生内存泄漏,请尝试使用“ with close(MyPool(processes = num_cpu))作为池:”来正确处理该池。 (2认同)

tim*_*ner 17

多处理模块有一个漂亮的界面使用与进程池线程.根据您当前的用例,您可以考虑使用multiprocessing.pool.ThreadPool外部池,这将导致线程(允许从内部生成进程)而不是进程.

它可能受到GIL的限制,但在我的特定情况下(我测试了两者),这里Pool创建的外部进程的启动时间远远超过了解决方案.ThreadPool


这真的很容易掉ProcessesThreads.阅读更多有关如何在此处此处使用ThreadPool解决方案的信息.


Mas*_*ano 17

我必须在Python 3.7中使用非守护程序池,并最终改编了接受的答案中发布的代码。下面是创建非守护程序池的代码段:

class NoDaemonProcess(multiprocessing.Process):
    @property
    def daemon(self):
        return False

    @daemon.setter
    def daemon(self, value):
        pass


class NoDaemonContext(type(multiprocessing.get_context())):
    Process = NoDaemonProcess

# We sub-class multiprocessing.pool.Pool instead of multiprocessing.Pool
# because the latter is only a wrapper function, not a proper class.
class MyPool(multiprocessing.pool.Pool):
    def __init__(self, *args, **kwargs):
        kwargs['context'] = NoDaemonContext()
        super(MyPool, self).__init__(*args, **kwargs)
Run Code Online (Sandbox Code Playgroud)

由于的当前实现multiprocessing已被广泛地重构为基于上下文,因此我们需要提供一个NoDaemonContext具有NoDaemonProcessas属性的类。MyPool然后将使用该上下文,而不是默认上下文。

就是说,我应该警告这种方法至少有两个警告:

  1. 它仍然取决于multiprocessing软件包的实现细节,因此可能随时中断。
  2. 为什么有正当的理由multiprocessing说得那么难用非恶魔的过程,其中有许多是解释在这里。我认为最引人注目的是:

    至于允许子线程使用子进程来产生自己的子进程,如果父线程或子线程在子进程完成并返回之前终止,则可能会产生一些僵尸“孙子”。

  • 谢谢你这个@massimiliano (2认同)
  • 关于警告:我的用例是并行任务,但孙子将信息返回给他们的父母,而他们的父母又在进行一些所需的本地处理后将信息返回给**他们的**父母。因此,每个级别/分支都对其所有叶子节点进行显式等待。如果您明确必须等待生成的进程完成,那么该警告是否仍然适用? (2认同)

Acu*_*nus 11

从 Python 3.8 开始,concurrent.futures.ProcessPoolExecutor没有这个限制。它可以有一个完全没有问题的嵌套进程池:

from concurrent.futures import ProcessPoolExecutor as Pool
from itertools import repeat
from multiprocessing import current_process
import time

def pid():
    return current_process().pid

def _square(i):  # Runs in inner_pool
    square = i ** 2
    time.sleep(i / 10)
    print(f'{pid()=} {i=} {square=}')
    return square

def _sum_squares(i, j):  # Runs in outer_pool
    with Pool(max_workers=2) as inner_pool:
        squares = inner_pool.map(_square, (i, j))
    sum_squares = sum(squares)
    time.sleep(sum_squares ** .5)
    print(f'{pid()=}, {i=}, {j=} {sum_squares=}')
    return sum_squares

def main():
    with Pool(max_workers=3) as outer_pool:
        for sum_squares in outer_pool.map(_sum_squares, range(5), repeat(3)):
            print(f'{pid()=} {sum_squares=}')

if __name__ == "__main__":
    main()
Run Code Online (Sandbox Code Playgroud)

上面的演示代码是用 Python 3.8 测试的。

ProcessPoolExecutor但是, 的一个限制是它没有maxtasksperchild。如果您需要这个,请考虑Massimiliano答案

信用:jfs的回答

  • 现在这显然是最好的解决方案,因为它需要最少的更改。 (3认同)
  • 完美运作!...作为旁注,在“ProcessPoolExecutor.Pool”中使用子“multiprocessing.Pool”也是可能的! (2认同)

Att*_*tio 7

在某些 Python 版本中,将标准 Pool 替换为 custom 可能会引发错误:AssertionError: group argument must be None for now.

在这里,我找到了一个可以提供帮助的解决方案:

class NoDaemonProcess(multiprocessing.Process):
    # make 'daemon' attribute always return False
    @property
    def daemon(self):
        return False

    @daemon.setter
    def daemon(self, val):
        pass


class NoDaemonProcessPool(multiprocessing.pool.Pool):

    def Process(self, *args, **kwds):
        proc = super(NoDaemonProcessPool, self).Process(*args, **kwds)
        proc.__class__ = NoDaemonProcess

        return proc
Run Code Online (Sandbox Code Playgroud)