多处理和垃圾收集

use*_*114 12 python unix garbage-collection multiprocessing

在py2.6 +中,multiprocessing模块提供了一个Pool类,因此可以:

class Volatile(object):
    def do_stuff(self, ...):
        pool = multiprocessing.Pool()
        return pool.imap(...)
Run Code Online (Sandbox Code Playgroud)

但是,使用2.7.2的标准Python实现,这种方法很快就会导致"IOError:[Errno 24]打开的文件过多".显然,pool对象永远不会被垃圾收集,因此它的进程永远不会终止,累积内部打开的任何描述符.我认为这是因为以下工作:

class Volatile(object):
    def do_stuff(self, ...):
        pool = multiprocessing.Pool()
        result = pool.map(...)
        pool.terminate()
        return result
Run Code Online (Sandbox Code Playgroud)

我想保持"懒惰"迭代器的方法imap; 在这种情况下垃圾收集器如何工作?如何修复代码?

use*_*114 8

最后,我最终传递了pool引用并在pool.imap迭代器完成后手动终止它:

class Volatile(object):
    def do_stuff(self, ...):
        pool = multiprocessing.Pool()
        return pool, pool.imap(...)

    def call_stuff(self):
        pool, results = self.do_stuff()
        for result in results:
            # lazy evaluation of the imap
        pool.terminate()
Run Code Online (Sandbox Code Playgroud)

如果将来有人偶然发现这个解决方案:chunksize参数非常重要Pool.imap(与普通的相反Pool.map,无关紧要).我手动设置它,以便每个进程接收1 + len(input) / len(pool)作业.将它保留为默认值chunksize=1给了我相同的性能,好像我根本没有使用并行处理......糟糕.

我想使用有序imap与有序没有真正的好处map,我个人更喜欢迭代器.


Jam*_*mes 5

在python中,你基本上不能保证什么时候会被破坏,在这种情况下,这不是多处理池的设计方式.

正确的做法是在多次调用函数时共享一个池.最简单的方法是将池存储为类(或可能是实例)变量:

class Dispatcher:
    pool = multiprocessing.Pool()
    def do_stuff(self, ...):
        result = self.pool.map(...)
        return result
Run Code Online (Sandbox Code Playgroud)


kxr*_*kxr 5

事实上,即使pool删除了对该对象的所有用户引用,并且队列代码中没有任何任务,并且完成了所有垃圾收集,这些进程仍然在操作系统中保持不可用的僵尸状态- 而且我们有 3 个僵尸服务线程Pool悬挂(Python 2.7 和 3.4):

>>> del pool
>>> gc.collect()
0
>>> gc.garbage
[]
>>> threading.enumerate()
[<_MainThread(MainThread, started 5632)>, <Thread(Thread-8, started daemon 5252)>, 
 <Thread(Thread-9, started daemon 5260)>, <Thread(Thread-7, started daemon 7608)>]
Run Code Online (Sandbox Code Playgroud)

进一步的Pool()将添加越来越多的进程和线程僵尸......它们会一直保留到主进程终止。

它需要特殊的戳来停止此类僵尸池 - 通过其服务线程_handle_workers

>>> ths = threading.enumerate()
>>> for th in ths: 
...     try: th.name, th._state, th._Thread__target
...     except AttributeError: pass
...     
('MainThread', 1, None)
('Thread-8', 0, <function _handle_tasks at 0x01462A30>)
('Thread-9', 0, <function _handle_results at 0x014629F0>)
('Thread-7', 0, <function _handle_workers at 0x01462A70>)
>>> ths[-1]._state = multiprocessing.pool.CLOSE  # or TERMINATE
>>> threading.enumerate()
[<_MainThread(MainThread, started 5632)>]
>>> 
Run Code Online (Sandbox Code Playgroud)

这将终止其他服务线程并终止子进程。


我认为一个问题是, Python 库中存在资源泄漏错误weakref,可以通过正确使用's 来修复。

另一点是,Pool创建和终止的成本很高(包括每个池 3 个服务线程,仅用于管理!),并且通常没有理由拥有比 CPU 核心(高 CPU 负载)多得多的工作进程或超过根据规定的有限数量。到另一个限制资源(例如网络带宽)。因此,将池视为一个单一的应用程序全局资源(可以选择通过超时管理)而不是一个仅由闭包(或由于错误而导致的终止()解决方法)保存的快速对象是合理的。

例如:

try:
    _unused = pool   # reload safe global var
except NameError:
    pool = None

def get_pool():
    global pool
    if pool is None:
        atexit.register(stop_pool)
        pool = Pool(CPUCORES)
    return pool

def stop_pool():
    global pool
    if pool:
        pool.terminate()
        pool = None
Run Code Online (Sandbox Code Playgroud)