Python多处理-“队列”对象没有属性“task_done”/“join”

get*_*lad 6 python multithreading multiprocessing python-2.7

我正在将线程进程重写为多处理队列,以尝试加速大型计算。我已经完成了 95% 的任务,但是我不知道如何在Queue空时发出信号multiprocessing.

\n\n
\n\n

我原来的代码是这样的:

\n\n
import Queue\nfrom threading import Thread\n\nnum_fetch_threads = 4\nenclosure_queue = Queue()\n\nfor i in range(num_fetch_threads):\n  worker = Thread(target=run_experiment, args=(i, enclosure_queue))\n  worker.setDaemon(True)\n  worker.start()\n\nfor experiment in experiment_collection:\n  enclosure_queue.put((experiment, otherVar))\n\nenclosure_queue.join()\n
Run Code Online (Sandbox Code Playgroud)\n\n

队列函数如下:

\n\n
def run_experiment(i, q):\n  while True:\n    ... do stuff ...\n    q.task_done()\n
Run Code Online (Sandbox Code Playgroud)\n\n
\n\n

我的新代码是这样的:

\n\n
from multiprocessing import Process, Queue\n\nnum_fetch_threads = 4\nenclosure_queue = Queue()\n\nfor i in range(num_fetch_threads):\n  worker = Process(target=run_experiment, args=(i, enclosure_queue))\n  worker.daemon = True\n  worker.start()\n\nfor experiment in experiment_collection:\n  enclosure_queue.put((experiment, otherVar))\n\nworker.join() ## I only put this here bc enclosure_queue.join() is not available\n
Run Code Online (Sandbox Code Playgroud)\n\n

以及新的队列函数:

\n\n
def run_experiment(i, q):\n  while True:\n    ... do stuff ...\n    ## not sure what should go here\n
Run Code Online (Sandbox Code Playgroud)\n\n
\n\n

我一直在阅读文档和谷歌,但无法弄清楚我错过了什么 - 我知道task_done/join不属于multiprocessing Queue课程的一部分,但不清楚我应该使用什么。

\n\n
\n

“它们的不同之处在于 Queue 缺少 Python 2.5\xe2\x80\x99s Queue.Queue 类中引入的 task_done() 和 join() 方法。” 来源

\n
\n\n

但如果没有其中任何一个,我不确定队列如何知道它已完成,以及如何继续该程序。

\n

nne*_*neo 2

考虑使用 amultiprocessing.Pool而不是手动管理工作人员。Pool 负责向 Worker 调度任务,具有 Map、Apply 等便捷功能,以及支持.close.join方法。Pool负责处理进程之间的队列并处理结果。您的代码可能如下所示multiprocessing.Pool

from multiprocessing import Pool

def do_experiment(exp):
    # run the experiment `exp`, will be called by `p.map`
    return result

p = Pool() # automatically scales to the number of CPUs available

results = p.map(do_experiment, experiment_collection)
p.close()
p.join()
Run Code Online (Sandbox Code Playgroud)