我可以在Pool.imap调用的函数中使用多处理队列吗?

Ols*_*son 25 python queue pool multiprocessing

我正在使用python 2.7,并尝试在自己的进程中运行一些CPU繁重的任务.我希望能够将消息发送回父进程,以使其了解进程的当前状态.多处理队列似乎是完美的,但我无法弄清楚如何让它工作.

所以,这是我的基本工作示例,减去队列的使用.

import multiprocessing as mp
import time

def f(x):
    return x*x

def main():
    pool = mp.Pool()
    results = pool.imap_unordered(f, range(1, 6))
    time.sleep(1)

    print str(results.next())

    pool.close()
    pool.join()

if __name__ == '__main__':
    main()
Run Code Online (Sandbox Code Playgroud)

我尝试过以几种方式传递Queue,并且它们收到错误消息"RuntimeError:Queue对象应该只通过继承在进程之间共享".这是我根据我发现的早期答案尝试的方法之一.(我在尝试使用Pool.map_async和Pool.imap时遇到同样的问题)

import multiprocessing as mp
import time

def f(args):
    x = args[0]
    q = args[1]
    q.put(str(x))
    time.sleep(0.1)
    return x*x

def main():
    q = mp.Queue()
    pool = mp.Pool()
    results = pool.imap_unordered(f, ([i, q] for i in range(1, 6)))

    print str(q.get())

    pool.close()
    pool.join()

if __name__ == '__main__':
    main()
Run Code Online (Sandbox Code Playgroud)

最后,0适应方法(使其成为全局)不会生成任何消息,它只是锁定.

import multiprocessing as mp
import time

q = mp.Queue()

def f(x):
    q.put(str(x))
    return x*x

def main():
    pool = mp.Pool()
    results = pool.imap_unordered(f, range(1, 6))
    time.sleep(1)

    print q.get()

    pool.close()
    pool.join()

if __name__ == '__main__':
    main()
Run Code Online (Sandbox Code Playgroud)

我知道它可能会直接使用multiprocessing.Process,并且还有其他库可以实现这一点,但我不想退出标准库函数,这是非常合适的,直到我确定它不仅仅是我的缺乏知识使我无法利用它们.

谢谢.

Ols*_*son 47

诀窍是将Queue作为参数传递给初始化器.似乎适用于所有Pool调度方法.

import multiprocessing as mp

def f(x):
    f.q.put('Doing: ' + str(x))
    return x*x

def f_init(q):
    f.q = q

def main():
    jobs = range(1,6)

    q = mp.Queue()
    p = mp.Pool(None, f_init, [q])
    results = p.imap(f, jobs)
    p.close()

    for i in range(len(jobs)):
        print q.get()
        print results.next()

if __name__ == '__main__':
    main()
Run Code Online (Sandbox Code Playgroud)

  • 非常好地演示了`multiprocessing.Pool`的`initializer`和`initargs`参数的用途和用途! (4认同)
  • @kepkin在Python中,每个函数都是一个对象(参见http://docs.python.org/reference/datamodel.html#the-standard-type-hierarchy Callable Types).因此,fq在函数对象f上设置名为q的属性.它只是一种快速轻量级的方法来保存Queue对象以供以后使用. (2认同)
  • fq = q 不是猴子补丁的一个例子吗?http://stackoverflow.com/questions/5626193/what-is-monkey-patch (2认同)