get*_*lad 6 python multithreading multiprocessing python-2.7
我正在将线程进程重写为多处理队列,以尝试加速大型计算。我已经完成了 95% 的任务,但是我不知道如何在Queue空时发出信号multiprocessing.
我原来的代码是这样的:
\n\nimport Queue\nfrom threading import Thread\n\nnum_fetch_threads = 4\nenclosure_queue = Queue()\n\nfor i in range(num_fetch_threads):\n worker = Thread(target=run_experiment, args=(i, enclosure_queue))\n worker.setDaemon(True)\n worker.start()\n\nfor experiment in experiment_collection:\n enclosure_queue.put((experiment, otherVar))\n\nenclosure_queue.join()\nRun Code Online (Sandbox Code Playgroud)\n\n队列函数如下:
\n\ndef run_experiment(i, q):\n while True:\n ... do stuff ...\n q.task_done()\nRun Code Online (Sandbox Code Playgroud)\n\n我的新代码是这样的:
\n\nfrom multiprocessing import Process, Queue\n\nnum_fetch_threads = 4\nenclosure_queue = Queue()\n\nfor i in range(num_fetch_threads):\n worker = Process(target=run_experiment, args=(i, enclosure_queue))\n worker.daemon = True\n worker.start()\n\nfor experiment in experiment_collection:\n enclosure_queue.put((experiment, otherVar))\n\nworker.join() ## I only put this here bc enclosure_queue.join() is not available\nRun Code Online (Sandbox Code Playgroud)\n\n以及新的队列函数:
\n\ndef run_experiment(i, q):\n while True:\n ... do stuff ...\n ## not sure what should go here\nRun Code Online (Sandbox Code Playgroud)\n\n我一直在阅读文档和谷歌,但无法弄清楚我错过了什么 - 我知道task_done/join不属于multiprocessing Queue课程的一部分,但不清楚我应该使用什么。
\n\n\n“它们的不同之处在于 Queue 缺少 Python 2.5\xe2\x80\x99s Queue.Queue 类中引入的 task_done() 和 join() 方法。” 来源
\n
但如果没有其中任何一个,我不确定队列如何知道它已完成,以及如何继续该程序。
\n考虑使用 amultiprocessing.Pool而不是手动管理工作人员。Pool 负责向 Worker 调度任务,具有 Map、Apply 等便捷功能,以及支持.close和.join方法。Pool负责处理进程之间的队列并处理结果。您的代码可能如下所示multiprocessing.Pool:
from multiprocessing import Pool
def do_experiment(exp):
# run the experiment `exp`, will be called by `p.map`
return result
p = Pool() # automatically scales to the number of CPUs available
results = p.map(do_experiment, experiment_collection)
p.close()
p.join()
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
10693 次 |
| 最近记录: |