Python Multiprocessing.Pool延迟迭代

Gab*_*abe 56 python multiprocessing

我想知道python的Multiprocessing.Pool类使用map,imap和map_async的方式.我的特殊问题是我想在一个迭代器上映射,该迭代器创建了占用大量内存的对象,并且不希望所有这些对象同时生成到内存中.我想看看各种map()函数是否会使我的迭代器变干,或者只是在子进程缓慢前进时智能地调用next()函数,所以我这样修改了一些测试:

def g():
  for el in xrange(100):
    print el
    yield el

def f(x):
  time.sleep(1)
  return x*x

if __name__ == '__main__':
  pool = Pool(processes=4)              # start 4 worker processes
  go = g()
  g2 = pool.imap(f, go)
  g2.next()
Run Code Online (Sandbox Code Playgroud)

依此类推map,imap和map_async.这是最公然的例子,因为简单地在g2上单次调用next()会从我的生成器g()中打印出所有元素,而如果imap这样做'懒惰',我希望它只调用go.next ()一次,因此只打印'1'.

有人可以清理正在发生的事情,并且如果有某种方法让进程池'懒惰'根据需要评估迭代器吗?

谢谢,

加布

unu*_*tbu 34

让我们先看一下程序的结尾.

多处理模块用于在程序结束时atexit调用multiprocessing.util._exit_function.

如果删除g2.next(),程序将很快结束.

_exit_function最终调用Pool._terminate_pool.主线程将状态pool._task_handler._state从更改RUNTERMINATE.同时,当pool._task_handler线程Pool._handle_tasks达到条件时,线程正在循环并挽救

            if thread._state:
                debug('task handler found thread._state != RUN')
                break
Run Code Online (Sandbox Code Playgroud)

(参见/usr/lib/python2.6/multiprocessing/pool.py)

这就是阻止任务处理程序完全使用你的生成器的原因g().如果你看,Pool._handle_tasks你会看到

        for i, task in enumerate(taskseq):
            ...
            try:
                put(task)
            except IOError:
                debug('could not put task on queue')
                break
Run Code Online (Sandbox Code Playgroud)

这是消耗您的发电机的代码.(taskseq不完全是你的发电机,但是taskseq消耗的,你的发电机也是如此.)

相反,当你调用g2.next()主线程调用时IMapIterator.next,并在它到达时等待self._cond.wait(timeout).

主线程正在等待而不是调用_exit_function是允许任务处理程序线程正常运行的,这意味着完全消耗生成器作为函数中s' 中put的任务.workerinqueuePool._handle_tasks

底线是所有Pool地图函数都使用它给出的整个迭代.如果你想以块的形式使用生成器,你可以这样做:

import multiprocessing as mp
import itertools
import time


def g():
    for el in xrange(50):
        print el
        yield el


def f(x):
    time.sleep(1)
    return x * x

if __name__ == '__main__':
    pool = mp.Pool(processes=4)              # start 4 worker processes
    go = g()
    result = []
    N = 11
    while True:
        g2 = pool.map(f, itertools.islice(go, N))
        if g2:
            result.extend(g2)
            time.sleep(1)
        else:
            break
    print(result)
Run Code Online (Sandbox Code Playgroud)

  • 很好的答案,我最终重新实现了一个同时消耗元素的线程池,但是你的islice解决方案对我来说的工作要少得多,哦,好吧:-).我试着查看pool.py一些,并注意到map/imap/map_async函数确实似乎立刻吃掉了迭代器.我不清楚这是否真的有必要,特别是在标准Pool.map()的情况下? (3认同)
  • @Gabe:为了及时使用迭代器,我认为必须在“Pool”中编写一些额外的信号机制,以告诉任务处理程序何时将更多任务“放入”“inqueue”中。也许有可能,但目前在“Pool”中不存在,并且也可能会减慢进程速度。 (2认同)

let*_*aik 5

您想要的内容在NuMap包中实现,来自网站:

\n\n
\n

NuMap 是并行(基于线程或进程、本地或远程)、缓冲、多任务、itertools.imap 或 multiprocessing.Pool.imap\n 函数替换。与 imap 一样,它对序列或可迭代元素的函数求值,并且它是惰性执行的。\n 可以通过 \xe2\x80\x9cstride\xe2\x80\x9d 和 \xe2\x80\x9cbuffer\xe2\x80\x9d 参数调整惰性。

\n
\n