为什么我不能将 multiprocessing.Queue 与 ProcessPoolExecutor 一起使用?

chr*_*amp 2 python queue multiprocessing python-3.x concurrent.futures

当我运行以下代码时:

from concurrent.futures import ProcessPoolExecutor, as_completed
from multiprocessing import Queue

q = Queue()

def my_task(x, queue):
    queue.put("Task Complete")
    return x

with ProcessPoolExecutor() as executor:
    tasks = [executor.submit(my_task, i, q) for i in range(10)]
    for task in as_completed(tasks):
        print(task.result())
Run Code Online (Sandbox Code Playgroud)

我收到此错误:

concurrent.futures.process._RemoteTraceback: 
"""
Traceback (most recent call last):
  File "/usr/lib/python3.10/multiprocessing/queues.py", line 244, in _feed
    obj = _ForkingPickler.dumps(obj)
  File "/usr/lib/python3.10/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
  File "/usr/lib/python3.10/multiprocessing/queues.py", line 58, in __getstate__
    context.assert_spawning(self)
  File "/usr/lib/python3.10/multiprocessing/context.py", line 373, in assert_spawning
    raise RuntimeError(
RuntimeError: Queue objects should only be shared between processes through inheritance
"""

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/tmp/nn.py", line 14, in <module>
    print(task.result())
  File "/usr/lib/python3.10/concurrent/futures/_base.py", line 451, in result
    return self.__get_result()
  File "/usr/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
    raise self._exception
  File "/usr/lib/python3.10/multiprocessing/queues.py", line 244, in _feed
    obj = _ForkingPickler.dumps(obj)
  File "/usr/lib/python3.10/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
  File "/usr/lib/python3.10/multiprocessing/queues.py", line 58, in __getstate__
    context.assert_spawning(self)
  File "/usr/lib/python3.10/multiprocessing/context.py", line 373, in assert_spawning
    raise RuntimeError(
RuntimeError: Queue objects should only be shared between processes through inheritance
Run Code Online (Sandbox Code Playgroud)

如果我不能用于多处理,那么 multiprocessing.Queue 的目的是什么?我怎样才能让它发挥作用?在我的实际代码中,我需要每个工作人员经常更新队列的任务状态,以便另一个线程将从该队列获取数据以提供进度条。

Boo*_*boo 5

简短说明

为什么不能将 amultiprocessing.Queue作为辅助函数参数传递?简而言之,提交的任务被提交到透明的输入队列,池进程从中获取下一个要执行的任务。但这些参数必须是可序列化的pickle,而 amultiprocessing.Queue通常是不可序列化的。但对于将参数作为函数参数传递给子进程的特殊情况,它是可序列化的。a 的参数multiprocessing.Process在创建实例时存储为实例的属性。当start在实例上调用 时,在新地址空间中run调用该方法之前,必须将其状态序列化到新地址空间。我不清楚为什么这种序列化适用于这种情况,但不适用于一般情况;我必须花费大量时间查看源代码,以便口译员得出明确的答案。

看看当我尝试将队列实例放入队列时会发生什么:

>>> from multiprocessing import Queue
>>> q1 = Queue()
>>> q2 = Queue()
>>> q1.put(q2)
>>> Traceback (most recent call last):
  File "C:\Program Files\Python38\lib\multiprocessing\queues.py", line 239, in _feed
    obj = _ForkingPickler.dumps(obj)
  File "C:\Program Files\Python38\lib\multiprocessing\reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
  File "C:\Program Files\Python38\lib\multiprocessing\queues.py", line 58, in __getstate__
    context.assert_spawning(self)
  File "C:\Program Files\Python38\lib\multiprocessing\context.py", line 359, in assert_spawning
    raise RuntimeError(
RuntimeError: Queue objects should only be shared between processes through inheritance

>>> import pickle
>>> b = pickle.dumps(q2)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "C:\Program Files\Python38\lib\multiprocessing\queues.py", line 58, in __getstate__
    context.assert_spawning(self)
  File "C:\Program Files\Python38\lib\multiprocessing\context.py", line 359, in assert_spawning
    raise RuntimeError(
RuntimeError: Queue objects should only be shared between processes through inheritance
>>>
Run Code Online (Sandbox Code Playgroud)

如何通过继承传递队列

首先,如果您刚刚my_task在循环中调用,您的代码将运行得更慢,因为多处理会引入额外的开销(启动进程和跨地址空间移动数据),这要求您从my_task并行运行中获得的收益超过抵消额外的开销。在您的情况下,它不会,因为my_taskCPU 密集程度不足以证明多处理的合理性。

也就是说,当您希望池进程使用实例时multiprocessing.Queue,它不能作为参数传递给辅助函数(与显式使用multiprocessing.Process实例而不是池时的情况不同)。相反,您必须使用队列实例在每个池进程中初始化一个全局变量。

如果您运行在使用fork创建新进程的平台下,那么您只需创建queue为全局进程,它将被每个池进程继承:

>>> from multiprocessing import Queue
>>> q1 = Queue()
>>> q2 = Queue()
>>> q1.put(q2)
>>> Traceback (most recent call last):
  File "C:\Program Files\Python38\lib\multiprocessing\queues.py", line 239, in _feed
    obj = _ForkingPickler.dumps(obj)
  File "C:\Program Files\Python38\lib\multiprocessing\reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
  File "C:\Program Files\Python38\lib\multiprocessing\queues.py", line 58, in __getstate__
    context.assert_spawning(self)
  File "C:\Program Files\Python38\lib\multiprocessing\context.py", line 359, in assert_spawning
    raise RuntimeError(
RuntimeError: Queue objects should only be shared between processes through inheritance

>>> import pickle
>>> b = pickle.dumps(q2)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "C:\Program Files\Python38\lib\multiprocessing\queues.py", line 58, in __getstate__
    context.assert_spawning(self)
  File "C:\Program Files\Python38\lib\multiprocessing\context.py", line 359, in assert_spawning
    raise RuntimeError(
RuntimeError: Queue objects should only be shared between processes through inheritance
>>>
Run Code Online (Sandbox Code Playgroud)

印刷:

from concurrent.futures import ProcessPoolExecutor, as_completed
from multiprocessing import Queue

queue = Queue()

def my_task(x):
    queue.put("Task Complete")
    return x

with ProcessPoolExecutor() as executor:
    tasks = [executor.submit(my_task, i) for i in range(10)]
    for task in as_completed(tasks):
        print(task.result())
    # This queue must be read before the pool terminates:
    for _ in range(10):
        print(queue.get())
Run Code Online (Sandbox Code Playgroud)

如果您需要不使用fork方法创建进程的平台的可移植性,例如 Windows(使用spawn方法),那么您无法将队列分配为全局队列,因为每个池进程都会创建自己的队列实例。相反,主进程必须创建队列,然后queue使用初始化程序和initargs初始化每个池进程的全局变量:

1
0
2
3
6
5
4
7
8
9
Task Complete
Task Complete
Task Complete
Task Complete
Task Complete
Task Complete
Task Complete
Task Complete
Task Complete
Task Complete
Run Code Online (Sandbox Code Playgroud)

如果您想在每个任务完成时推进进度条(您没有准确说明进度条如何推进;请参阅我对您的问题的评论),那么以下内容表明队列是必要的。如果提交的每个任务由 N 个部分组成(总共 10 * N 个部分,因为有 10 个任务)并且希望在每个部分完成时看到一个进度条前进,那么队列可能是最直接的方法向主流程发出部分完成信号的信号。

from concurrent.futures import ProcessPoolExecutor, as_completed
from multiprocessing import Queue

def init_pool_processes(q):
    global queue

    queue = q

def my_task(x):
    queue.put("Task Complete")
    return x

# Windows compatibilitY
if __name__ == '__main__':
    q = Queue()

    with ProcessPoolExecutor(initializer=init_pool_processes, initargs=(q,)) as executor:
        tasks = [executor.submit(my_task, i) for i in range(10)]
        for task in as_completed(tasks):
            print(task.result())
        # This queue must be read before the pool terminates:
        for _ in range(10):
            print(q.get())
Run Code Online (Sandbox Code Playgroud)