kal*_*gne 1 python pool multiprocessing python-2.7
我在池中使用多处理。我需要将结构作为参数传递给必须在单独的进程中使用的函数。我无法使用的映射功能multiprocessing.Pool,因为我无法复制Pool.Queue,也不能复制Pool.Array。该结构将在运行中用于记录每个终止过程的结果。这是我的代码:
import multiprocessing
from multiprocessing import Process, Manager, Queue, Array
import itertools
import time
def do_work(number, out_queue=None):
if out_queue is not None:
print "Treated nb ", number
out_queue.append("Treated nb " + str(number))
return 0
def multi_run_wrapper(iter_values):
return do_work(*iter_values)
def test_pool():
# Get the max cpu
nb_proc = multiprocessing.cpu_count()
pool = multiprocessing.Pool(processes=nb_proc)
total_tasks = 16
tasks = range(total_tasks)
out_queue= Queue() # Use it instead of out_array and change out_queue.append() into out_queue.put() in the do_work() function.
out_array = Array('i', total_tasks)
iter_values = itertools.izip(tasks, itertools.repeat(out_array))
results = pool.map_async(multi_run_wrapper, iter_values)
pool.close()
pool.join()
print results._value
while not out_queue.empty():
print "queue: ", out_queue.get()
print "out array: \n", out_array
if __name__ == "__main__":
test_pool()
Run Code Online (Sandbox Code Playgroud)
我需要在一个分离的进程中启动一个worker,并将输出队列作为参数传递。我还想指定包含有限数量的正在运行的进程的池。为此,我正在使用该pool.map_async()功能。不幸的是,上面的代码给了我一个错误:
Exception in thread Thread-2:
Traceback (most recent call last):
File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py", line 808, in __bootstrap_inner
self.run()
File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py", line 761, in run
self.__target(*self.__args, **self.__kwargs)
File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/pool.py", line 342, in _handle_tasks
put(task)
File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/queues.py", line 77, in __getstate__
assert_spawning(self)
File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/forking.py", line 52, in assert_spawning
' through inheritance' % type(self).__name__
RuntimeError: Queue objects should only be shared between processes through inheritance
Run Code Online (Sandbox Code Playgroud)
我相信这是因为Queue正如我在文档中所读到的那样,永远无法复制a 。然后我想到了将队列设为全局变量,这样就不再需要传递它了,但是在我看来,这太混乱了。我也想过用的multiprocessing.Array,而不是
out_array = Array('i', total_tasks)
Run Code Online (Sandbox Code Playgroud)
但是会出现与队列相同的错误:
# ...
RuntimeError: SynchronizedArray objects should only be shared between processes through inheritance
Run Code Online (Sandbox Code Playgroud)
我需要在相对较大的软件中使用此功能-使用多处理和从子流程交换信息-所以我希望我的代码保持整洁。
如何以一种优雅的方式将队列传递给我的工人?
当然,欢迎使用任何其他处理主要规范的方法。
multiprocessing.Pool不会multiprocessing.Queue在其工作队列中接受a 作为参数。我相信这是因为它在内部使用队列将数据来回发送到工作进程。有几种解决方法:
1)您真的需要使用队列吗?该Pool功能的优点之一是它们的返回值被发送回主进程。通常,遍历池中的返回值比使用单独的队列要好。这也避免了通过检查引入竞争条件queue.empty()
2)如果必须使用Queue,则可以使用中的一个multiprocessing.Manager。这是共享队列的代理,可以作为参数传递给Pool函数。
3)您可以在创建时Queue使用初始化程序将法线传递给工作进程Pool(例如/sf/answers/269031941/)。这有点怪。
我上面提到的比赛条件来自:
while not out_queue.empty():
print "queue: ", out_queue.get()
Run Code Online (Sandbox Code Playgroud)
当工作进程填充队列时,您可能会遇到以下情况:队列当前为空,因为工作进程将要放入一些东西。如果您此时检查.empty(),将尽早结束。更好的方法是将标记值放入队列中,以在完成将数据放入队列时发出信号。
| 归档时间: |
|
| 查看次数: |
5471 次 |
| 最近记录: |