C.B*_*.B. 27 python memory pool multiprocessing
这是程序:
#!/usr/bin/python
import multiprocessing
def dummy_func(r):
pass
def worker():
pass
if __name__ == '__main__':
pool = multiprocessing.Pool(processes=16)
for index in range(0,100000):
pool.apply_async(worker, callback=dummy_func)
# clean up
pool.close()
pool.join()
Run Code Online (Sandbox Code Playgroud)
我发现内存使用(包括VIRT和RES)一直持续到close()/ join(),有没有解决方法摆脱这个?我用2.7尝试了maxtasksperchild,但它也没有帮助.
我有一个更复杂的程序,调用apply_async()〜6M次,并且在~1.5M点我已经有6G + RES,为了避免所有其他因素,我将程序简化为以上版本.
编辑:
原来这个版本效果更好,感谢大家的意见:
#!/usr/bin/python
import multiprocessing
ready_list = []
def dummy_func(index):
global ready_list
ready_list.append(index)
def worker(index):
return index
if __name__ == '__main__':
pool = multiprocessing.Pool(processes=16)
result = {}
for index in range(0,1000000):
result[index] = (pool.apply_async(worker, (index,), callback=dummy_func))
for ready in ready_list:
result[ready].wait()
del result[ready]
ready_list = []
# clean up
pool.close()
pool.join()
Run Code Online (Sandbox Code Playgroud)
我没有放任何锁,因为我认为主进程是单线程的(回调或多或少像我读过的每个文档的事件驱动的东西).
我将v1的索引范围更改为1,000,000,与v2相同并进行了一些测试 - 这对我来说很奇怪v2甚至比v1快〜10%(33s vs 37s),也许v1做了太多的内部列表维护工作.v2绝对是内存使用的赢家,它从未超过300M(VIRT)和50M(RES),而v1曾经是370M/120M,最好的是330M/85M.所有数字仅为3~4次测试,仅供参考.
ded*_*ddu 19
我最近遇到了内存问题,因为我多次使用多处理功能,所以它不断产生进程,并将它们留在内存中.
这是我现在使用的解决方案:
def myParallelProcess(ahugearray)
from multiprocessing import Pool
from contextlib import closing
with closing( Pool(15) ) as p:
res = p.imap_unordered(simple_matching, ahugearray, 100)
return res
Run Code Online (Sandbox Code Playgroud)
我❤ 用
使用map_async
而不是apply_async
避免过多的内存使用.
对于第一个示例,请更改以下两行:
for index in range(0,100000):
pool.apply_async(worker, callback=dummy_func)
Run Code Online (Sandbox Code Playgroud)
至
pool.map_async(worker, range(100000), callback=dummy_func)
Run Code Online (Sandbox Code Playgroud)
它会在你看到它的内存使用情况之前眨眼结束top
.将列表更改为更大的列表以查看差异.但是,map_async
如果没有__len__
方法,注意将首先将传递给它的迭代转换为列表以计算其长度.如果你有一个包含大量元素的迭代器,你可以用它itertools.islice
来处理更小的块.
我在现实生活中有一个记忆问题,有更多的数据,最后发现罪魁祸首apply_async
.
PS,就内存使用而言,你的两个例子没有明显的区别.
我正在处理一个非常大的 3d 点云数据集。我尝试使用多处理模块来加速处理,但我开始出现内存不足错误。经过一些研究和测试,我确定我填充要处理的任务队列的速度比子进程清空队列的速度要快得多。我确定通过分块,或使用 map_async 或其他我可以调整负载的东西,但我不想对周围的逻辑进行重大更改。
我遇到的愚蠢的解决方案是pool._cache
间歇性地检查长度,如果缓存太大,则等待队列清空。
在我的主循环中,我已经有了一个计数器和一个状态代码:
# Update status
count += 1
if count%10000 == 0:
sys.stdout.write('.')
if len(pool._cache) > 1e6:
print "waiting for cache to clear..."
last.wait() # Where last is assigned the latest ApplyResult
Run Code Online (Sandbox Code Playgroud)
因此,每向池中插入 10k 次,我就会检查是否有超过 100 万个操作排队(主进程中使用了大约 1G 的内存)。当队列已满时,我只等待最后插入的作业完成。
现在我的程序可以运行几个小时而不会耗尽内存。当工作人员继续处理数据时,主进程只是偶尔暂停。
顺便说一句,_cache 成员记录在多处理模块池示例中:
#
# Check there are no outstanding tasks
#
assert not pool._cache, 'cache = %r' % pool._cache
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
25893 次 |
最近记录: |