Gab*_*abe 56 python multiprocessing
我想知道python的Multiprocessing.Pool类使用map,imap和map_async的方式.我的特殊问题是我想在一个迭代器上映射,该迭代器创建了占用大量内存的对象,并且不希望所有这些对象同时生成到内存中.我想看看各种map()函数是否会使我的迭代器变干,或者只是在子进程缓慢前进时智能地调用next()函数,所以我这样修改了一些测试:
def g():
for el in xrange(100):
print el
yield el
def f(x):
time.sleep(1)
return x*x
if __name__ == '__main__':
pool = Pool(processes=4) # start 4 worker processes
go = g()
g2 = pool.imap(f, go)
g2.next()
Run Code Online (Sandbox Code Playgroud)
依此类推map,imap和map_async.这是最公然的例子,因为简单地在g2上单次调用next()会从我的生成器g()中打印出所有元素,而如果imap这样做'懒惰',我希望它只调用go.next ()一次,因此只打印'1'.
有人可以清理正在发生的事情,并且如果有某种方法让进程池'懒惰'根据需要评估迭代器吗?
谢谢,
加布
unu*_*tbu 34
让我们先看一下程序的结尾.
多处理模块用于在程序结束时atexit
调用multiprocessing.util._exit_function
.
如果删除g2.next()
,程序将很快结束.
在_exit_function
最终调用Pool._terminate_pool
.主线程将状态pool._task_handler._state
从更改RUN
为TERMINATE
.同时,当pool._task_handler
线程Pool._handle_tasks
达到条件时,线程正在循环并挽救
if thread._state:
debug('task handler found thread._state != RUN')
break
Run Code Online (Sandbox Code Playgroud)
(参见/usr/lib/python2.6/multiprocessing/pool.py)
这就是阻止任务处理程序完全使用你的生成器的原因g()
.如果你看,Pool._handle_tasks
你会看到
for i, task in enumerate(taskseq):
...
try:
put(task)
except IOError:
debug('could not put task on queue')
break
Run Code Online (Sandbox Code Playgroud)
这是消耗您的发电机的代码.(taskseq
不完全是你的发电机,但是taskseq
消耗的,你的发电机也是如此.)
相反,当你调用g2.next()
主线程调用时IMapIterator.next
,并在它到达时等待self._cond.wait(timeout)
.
主线程正在等待而不是调用_exit_function
是允许任务处理程序线程正常运行的,这意味着完全消耗生成器作为函数中s' 中put
的任务.worker
inqueue
Pool._handle_tasks
底线是所有Pool
地图函数都使用它给出的整个迭代.如果你想以块的形式使用生成器,你可以这样做:
import multiprocessing as mp
import itertools
import time
def g():
for el in xrange(50):
print el
yield el
def f(x):
time.sleep(1)
return x * x
if __name__ == '__main__':
pool = mp.Pool(processes=4) # start 4 worker processes
go = g()
result = []
N = 11
while True:
g2 = pool.map(f, itertools.islice(go, N))
if g2:
result.extend(g2)
time.sleep(1)
else:
break
print(result)
Run Code Online (Sandbox Code Playgroud)
您想要的内容在NuMap包中实现,来自网站:
\n\n\n\nNuMap 是并行(基于线程或进程、本地或远程)、缓冲、多任务、itertools.imap 或 multiprocessing.Pool.imap\n 函数替换。与 imap 一样,它对序列或可迭代元素的函数求值,并且它是惰性执行的。\n 可以通过 \xe2\x80\x9cstride\xe2\x80\x9d 和 \xe2\x80\x9cbuffer\xe2\x80\x9d 参数调整惰性。
\n
归档时间: |
|
查看次数: |
17833 次 |
最近记录: |