use*_*114 12 python unix garbage-collection multiprocessing
在py2.6 +中,multiprocessing
模块提供了一个Pool
类,因此可以:
class Volatile(object):
def do_stuff(self, ...):
pool = multiprocessing.Pool()
return pool.imap(...)
Run Code Online (Sandbox Code Playgroud)
但是,使用2.7.2的标准Python实现,这种方法很快就会导致"IOError:[Errno 24]打开的文件过多".显然,pool
对象永远不会被垃圾收集,因此它的进程永远不会终止,累积内部打开的任何描述符.我认为这是因为以下工作:
class Volatile(object):
def do_stuff(self, ...):
pool = multiprocessing.Pool()
result = pool.map(...)
pool.terminate()
return result
Run Code Online (Sandbox Code Playgroud)
我想保持"懒惰"迭代器的方法imap
; 在这种情况下垃圾收集器如何工作?如何修复代码?
最后,我最终传递了pool
引用并在pool.imap
迭代器完成后手动终止它:
class Volatile(object):
def do_stuff(self, ...):
pool = multiprocessing.Pool()
return pool, pool.imap(...)
def call_stuff(self):
pool, results = self.do_stuff()
for result in results:
# lazy evaluation of the imap
pool.terminate()
Run Code Online (Sandbox Code Playgroud)
如果将来有人偶然发现这个解决方案:chunksize参数非常重要Pool.imap
(与普通的相反Pool.map
,无关紧要).我手动设置它,以便每个进程接收1 + len(input) / len(pool)
作业.将它保留为默认值chunksize=1
给了我相同的性能,好像我根本没有使用并行处理......糟糕.
我想使用有序imap
与有序没有真正的好处map
,我个人更喜欢迭代器.
在python中,你基本上不能保证什么时候会被破坏,在这种情况下,这不是多处理池的设计方式.
正确的做法是在多次调用函数时共享一个池.最简单的方法是将池存储为类(或可能是实例)变量:
class Dispatcher:
pool = multiprocessing.Pool()
def do_stuff(self, ...):
result = self.pool.map(...)
return result
Run Code Online (Sandbox Code Playgroud)
事实上,即使pool
删除了对该对象的所有用户引用,并且队列代码中没有任何任务,并且完成了所有垃圾收集,这些进程仍然在操作系统中保持不可用的僵尸状态- 而且我们有 3 个僵尸服务线程Pool
悬挂(Python 2.7 和 3.4):
>>> del pool
>>> gc.collect()
0
>>> gc.garbage
[]
>>> threading.enumerate()
[<_MainThread(MainThread, started 5632)>, <Thread(Thread-8, started daemon 5252)>,
<Thread(Thread-9, started daemon 5260)>, <Thread(Thread-7, started daemon 7608)>]
Run Code Online (Sandbox Code Playgroud)
进一步的Pool()
将添加越来越多的进程和线程僵尸......它们会一直保留到主进程终止。
它需要特殊的戳来停止此类僵尸池 - 通过其服务线程_handle_workers
:
>>> ths = threading.enumerate()
>>> for th in ths:
... try: th.name, th._state, th._Thread__target
... except AttributeError: pass
...
('MainThread', 1, None)
('Thread-8', 0, <function _handle_tasks at 0x01462A30>)
('Thread-9', 0, <function _handle_results at 0x014629F0>)
('Thread-7', 0, <function _handle_workers at 0x01462A70>)
>>> ths[-1]._state = multiprocessing.pool.CLOSE # or TERMINATE
>>> threading.enumerate()
[<_MainThread(MainThread, started 5632)>]
>>>
Run Code Online (Sandbox Code Playgroud)
这将终止其他服务线程并终止子进程。
我认为一个问题是, Python 库中存在资源泄漏错误weakref
,可以通过正确使用's 来修复。
另一点是,Pool
创建和终止的成本很高(包括每个池 3 个服务线程,仅用于管理!),并且通常没有理由拥有比 CPU 核心(高 CPU 负载)多得多的工作进程或超过根据规定的有限数量。到另一个限制资源(例如网络带宽)。因此,将池视为一个单一的应用程序全局资源(可以选择通过超时管理)而不是一个仅由闭包(或由于错误而导致的终止()解决方法)保存的快速对象是合理的。
例如:
try:
_unused = pool # reload safe global var
except NameError:
pool = None
def get_pool():
global pool
if pool is None:
atexit.register(stop_pool)
pool = Pool(CPUCORES)
return pool
def stop_pool():
global pool
if pool:
pool.terminate()
pool = None
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
6738 次 |
最近记录: |