ale*_*xis 77 python queue parallel-processing multiprocessing python-multiprocessing
该multiprocessing
模块的文档显示了如何将队列传递给以multiprocessing.Process
.开头的进程.但是,如何与异步工作进程共享队列apply_async
?我不需要动态加入或其他任何东西,只是工人(反复)将结果报告回基地的一种方式.
import multiprocessing
def worker(name, que):
que.put("%d is done" % name)
if __name__ == '__main__':
pool = multiprocessing.Pool(processes=3)
q = multiprocessing.Queue()
workers = pool.apply_async(worker, (33, q))
Run Code Online (Sandbox Code Playgroud)
这失败了:
RuntimeError: Queue objects should only be shared between processes through inheritance
.我理解这意味着什么,我理解继承的建议,而不是要求pickle/unpickling(以及所有特殊的Windows限制).但如何做我传递队列中一个可行的办法?我找不到一个例子,我尝试了几种以各种方式失败的替代品.请帮忙?
end*_*ill 116
尝试使用multiprocessing.Manager来管理队列,并使其可供不同的工作人员访问.
import multiprocessing
def worker(name, que):
que.put("%d is done" % name)
if __name__ == '__main__':
pool = multiprocessing.Pool(processes=3)
m = multiprocessing.Manager()
q = m.Queue()
workers = pool.apply_async(worker, (33, q))
Run Code Online (Sandbox Code Playgroud)
multiprocessing.Pool
已经具有共享的结果队列,因此无需另外包含Manager.Queue
。Manager.Queue
是位于queue.Queue
后台的(多线程队列),位于单独的服务器进程上,并通过代理公开。与Pool的内部队列相比,这增加了额外的开销。与依赖于Pool的本机结果处理相反,Manager.Queue
也不能保证命令中的结果是有序的。
工作进程不是从开始的.apply_async()
,在实例化时已经发生Pool
。什么是当你调用开始pool.apply_async()
一个新的“工作”。Pool的工作进程在multiprocessing.pool.worker
后台运行-function。该函数负责处理通过Pool内部传输的新“任务”,Pool._inqueue
并通过将结果发送回父级Pool._outqueue
。您指定的func
将在内执行multiprocessing.pool.worker
。func
只需要return
做某事,结果将自动发送回父级。
.apply_async()
立即(异步)返回一个AsyncResult
对象(的别名ApplyResult
)。您需要.get()
在该对象上调用(正在阻止)以接收实际结果。另一种选择是注册一个回调函数,一旦结果准备就绪,就会触发该回调函数。
from multiprocessing import Pool
def busy_foo(i):
"""Dummy function simulating cpu-bound work."""
for _ in range(int(10e6)): # do stuff
pass
return i
if __name__ == '__main__':
with Pool(4) as pool:
print(pool._outqueue) # DEMO
results = [pool.apply_async(busy_foo, (i,)) for i in range(10)]
# `.apply_async()` immediately returns AsyncResult (ApplyResult) object
print(results[0]) # DEMO
results = [res.get() for res in results]
print(f'result: {results}')
Run Code Online (Sandbox Code Playgroud)
示例输出:
<multiprocessing.queues.SimpleQueue object at 0x7fa124fd67f0>
<multiprocessing.pool.ApplyResult object at 0x7fa12586da20>
result: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
Run Code Online (Sandbox Code Playgroud)
注意:为指定timeout
-parameter .get()
不会停止在worker中实际执行任务,它只会通过引发来解除阻塞等待的父对象multiprocessing.TimeoutError
。