chr*_*amp 2 python queue multiprocessing python-3.x concurrent.futures
当我运行以下代码时:
from concurrent.futures import ProcessPoolExecutor, as_completed
from multiprocessing import Queue
q = Queue()
def my_task(x, queue):
queue.put("Task Complete")
return x
with ProcessPoolExecutor() as executor:
tasks = [executor.submit(my_task, i, q) for i in range(10)]
for task in as_completed(tasks):
print(task.result())
Run Code Online (Sandbox Code Playgroud)
我收到此错误:
concurrent.futures.process._RemoteTraceback:
"""
Traceback (most recent call last):
File "/usr/lib/python3.10/multiprocessing/queues.py", line 244, in _feed
obj = _ForkingPickler.dumps(obj)
File "/usr/lib/python3.10/multiprocessing/reduction.py", line 51, in dumps
cls(buf, protocol).dump(obj)
File "/usr/lib/python3.10/multiprocessing/queues.py", line 58, in __getstate__
context.assert_spawning(self)
File "/usr/lib/python3.10/multiprocessing/context.py", line 373, in assert_spawning
raise RuntimeError(
RuntimeError: Queue objects should only be shared between processes through inheritance
"""
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/tmp/nn.py", line 14, in <module>
print(task.result())
File "/usr/lib/python3.10/concurrent/futures/_base.py", line 451, in result
return self.__get_result()
File "/usr/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
raise self._exception
File "/usr/lib/python3.10/multiprocessing/queues.py", line 244, in _feed
obj = _ForkingPickler.dumps(obj)
File "/usr/lib/python3.10/multiprocessing/reduction.py", line 51, in dumps
cls(buf, protocol).dump(obj)
File "/usr/lib/python3.10/multiprocessing/queues.py", line 58, in __getstate__
context.assert_spawning(self)
File "/usr/lib/python3.10/multiprocessing/context.py", line 373, in assert_spawning
raise RuntimeError(
RuntimeError: Queue objects should only be shared between processes through inheritance
Run Code Online (Sandbox Code Playgroud)
如果我不能用于多处理,那么 multiprocessing.Queue 的目的是什么?我怎样才能让它发挥作用?在我的实际代码中,我需要每个工作人员经常更新队列的任务状态,以便另一个线程将从该队列获取数据以提供进度条。
简短说明
为什么不能将 amultiprocessing.Queue作为辅助函数参数传递?简而言之,提交的任务被提交到透明的输入队列,池进程从中获取下一个要执行的任务。但这些参数必须是可序列化的pickle,而 amultiprocessing.Queue通常是不可序列化的。但对于将参数作为函数参数传递给子进程的特殊情况,它是可序列化的。a 的参数multiprocessing.Process在创建实例时存储为实例的属性。当start在实例上调用 时,在新地址空间中run调用该方法之前,必须将其状态序列化到新地址空间。我不清楚为什么这种序列化适用于这种情况,但不适用于一般情况;我必须花费大量时间查看源代码,以便口译员得出明确的答案。
看看当我尝试将队列实例放入队列时会发生什么:
>>> from multiprocessing import Queue
>>> q1 = Queue()
>>> q2 = Queue()
>>> q1.put(q2)
>>> Traceback (most recent call last):
File "C:\Program Files\Python38\lib\multiprocessing\queues.py", line 239, in _feed
obj = _ForkingPickler.dumps(obj)
File "C:\Program Files\Python38\lib\multiprocessing\reduction.py", line 51, in dumps
cls(buf, protocol).dump(obj)
File "C:\Program Files\Python38\lib\multiprocessing\queues.py", line 58, in __getstate__
context.assert_spawning(self)
File "C:\Program Files\Python38\lib\multiprocessing\context.py", line 359, in assert_spawning
raise RuntimeError(
RuntimeError: Queue objects should only be shared between processes through inheritance
>>> import pickle
>>> b = pickle.dumps(q2)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "C:\Program Files\Python38\lib\multiprocessing\queues.py", line 58, in __getstate__
context.assert_spawning(self)
File "C:\Program Files\Python38\lib\multiprocessing\context.py", line 359, in assert_spawning
raise RuntimeError(
RuntimeError: Queue objects should only be shared between processes through inheritance
>>>
Run Code Online (Sandbox Code Playgroud)
如何通过继承传递队列
首先,如果您刚刚my_task在循环中调用,您的代码将运行得更慢,因为多处理会引入额外的开销(启动进程和跨地址空间移动数据),这要求您从my_task并行运行中获得的收益超过抵消额外的开销。在您的情况下,它不会,因为my_taskCPU 密集程度不足以证明多处理的合理性。
也就是说,当您希望池进程使用实例时multiprocessing.Queue,它不能作为参数传递给辅助函数(与显式使用multiprocessing.Process实例而不是池时的情况不同)。相反,您必须使用队列实例在每个池进程中初始化一个全局变量。
如果您运行在使用fork创建新进程的平台下,那么您只需创建queue为全局进程,它将被每个池进程继承:
>>> from multiprocessing import Queue
>>> q1 = Queue()
>>> q2 = Queue()
>>> q1.put(q2)
>>> Traceback (most recent call last):
File "C:\Program Files\Python38\lib\multiprocessing\queues.py", line 239, in _feed
obj = _ForkingPickler.dumps(obj)
File "C:\Program Files\Python38\lib\multiprocessing\reduction.py", line 51, in dumps
cls(buf, protocol).dump(obj)
File "C:\Program Files\Python38\lib\multiprocessing\queues.py", line 58, in __getstate__
context.assert_spawning(self)
File "C:\Program Files\Python38\lib\multiprocessing\context.py", line 359, in assert_spawning
raise RuntimeError(
RuntimeError: Queue objects should only be shared between processes through inheritance
>>> import pickle
>>> b = pickle.dumps(q2)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "C:\Program Files\Python38\lib\multiprocessing\queues.py", line 58, in __getstate__
context.assert_spawning(self)
File "C:\Program Files\Python38\lib\multiprocessing\context.py", line 359, in assert_spawning
raise RuntimeError(
RuntimeError: Queue objects should only be shared between processes through inheritance
>>>
Run Code Online (Sandbox Code Playgroud)
印刷:
from concurrent.futures import ProcessPoolExecutor, as_completed
from multiprocessing import Queue
queue = Queue()
def my_task(x):
queue.put("Task Complete")
return x
with ProcessPoolExecutor() as executor:
tasks = [executor.submit(my_task, i) for i in range(10)]
for task in as_completed(tasks):
print(task.result())
# This queue must be read before the pool terminates:
for _ in range(10):
print(queue.get())
Run Code Online (Sandbox Code Playgroud)
如果您需要不使用fork方法创建进程的平台的可移植性,例如 Windows(使用spawn方法),那么您无法将队列分配为全局队列,因为每个池进程都会创建自己的队列实例。相反,主进程必须创建队列,然后queue使用初始化程序和initargs初始化每个池进程的全局变量:
1
0
2
3
6
5
4
7
8
9
Task Complete
Task Complete
Task Complete
Task Complete
Task Complete
Task Complete
Task Complete
Task Complete
Task Complete
Task Complete
Run Code Online (Sandbox Code Playgroud)
如果您想在每个任务完成时推进进度条(您没有准确说明进度条如何推进;请参阅我对您的问题的评论),那么以下内容表明队列是必要的。如果提交的每个任务由 N 个部分组成(总共 10 * N 个部分,因为有 10 个任务)并且希望在每个部分完成时看到一个进度条前进,那么队列可能是最直接的方法向主流程发出部分完成信号的信号。
from concurrent.futures import ProcessPoolExecutor, as_completed
from multiprocessing import Queue
def init_pool_processes(q):
global queue
queue = q
def my_task(x):
queue.put("Task Complete")
return x
# Windows compatibilitY
if __name__ == '__main__':
q = Queue()
with ProcessPoolExecutor(initializer=init_pool_processes, initargs=(q,)) as executor:
tasks = [executor.submit(my_task, i) for i in range(10)]
for task in as_completed(tasks):
print(task.result())
# This queue must be read before the pool terminates:
for _ in range(10):
print(q.get())
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
817 次 |
| 最近记录: |