Python multiprocessing.Queue vs multiprocessing.manager().Queue()

nov*_*cef 33 python queue multiprocessing python-multiprocessing

我有一个简单的任务:

def worker(queue):
    while True:
        try:
            _ = queue.get_nowait()
        except Queue.Empty:
            break

if __name__ == '__main__':
    manager = multiprocessing.Manager()
    # queue = multiprocessing.Queue()
    queue = manager.Queue()

    for i in range(5):
        queue.put(i)

    processes = []

    for i in range(2):
        proc = multiprocessing.Process(target=worker, args=(queue,))
        processes.append(proc)
        proc.start()

    for proc in processes:
        proc.join()
Run Code Online (Sandbox Code Playgroud)

似乎multiprocessing.Queue可以完成我需要的所有工作,但另一方面我看到很多manager().Queue()的例子并且无法理解我真正需要的东西.看起来像Manager().Queue()使用某种代理对象,但我不明白这些目的,因为multiprocessing.Queue()在没有任何代理对象的情况下执行相同的工作.

所以,我的问题是:

1)multiprocessing.manager()返回的多处理.Queue和object之间有什么区别.Queue()?

2)我需要使用什么?

小智 26

虽然我对这个主题的理解是有限的,但从我的工作中我可以看出,multiprocessing.Queue()和multiprocessing.Manager()之间存在一个主要区别.Queue():

  • multiprocessing.Queue()是一个对象,而multiprocessing.Manager().Queue()是一个地址(代理),指向由multiprocessing.Manager()对象管理的共享队列.
  • 因此,您无法将正常的multiprocessing.Queue()对象传递给Pool方法,因为它无法进行pickle.
  • 此外,python doc告诉我们在使用multiprocessing.Queue()时要特别注意,因为它可能会产生不良影响

注意将对象放入队列时,将对该对象进行pickle,然后后台线程将pickle数据刷新到底层管道.这有一些后果有点令人惊讶,但不应该造成任何实际困难 - 如果他们真的打扰你,那么你可以改为使用由经理创建的队列.将对象放在空队列之后,在队列的empty()方法返回False之前可能存在无穷小的延迟,并且get_nowait()可以返回而不会引发Queue.Empty.如果多个进程将对象排入队列,则可能无序地在另一端接收对象.但是,由相同进程排队的对象将始终按预期的顺序相互关联.

警告如上所述,如果子进程已将项目放入队列(并且它未使用JoinableQueue.cancel_join_thread),则在将所有缓冲的项目刷新到管道之前,该进程不会终止.这意味着,如果您尝试加入该进程,则可能会遇到死锁,除非您确定已经使用了已放入队列的所有项目.类似地,如果子进程是非守护进程,则父进程在尝试加入其所有非守护进程子进程时可能会在退出时挂起.请注意,使用管理器创建的队列没有此问题.

通过将队列设置为全局变量并在初始化时为所有进程设置它,可以使用multiprocessing.Queue()和Pool一起使用:

queue = multiprocessing.Queue()
def initialize_shared(q):
    global queue
    queue=q

pool= Pool(nb_process,initializer=initialize_shared, initargs(queue,))
Run Code Online (Sandbox Code Playgroud)

将使用正确的共享队列创建池进程,但我们可以争辩说没有为此用途创建multiprocessing.Queue()对象.

另一方面,manager.Queue()可以通过将它作为函数的普通参数传递来在池子进程之间共享.

在我看来,使用multiprocessing.Manager().Queue()在每种情况下都很好,而且不那么麻烦.使用经理可能有一些缺点,但我不知道.

  • 据我所知,托管项目(队列、值等)的缺点是它们速度较慢。常规的多进程项是共享的,因此访问速度很快,但需要加锁保护。据我所知, multiprocess.manager 等效项是在其自己的流程中处理的实际项目的代理。因此,虽然管理器可以防止竞争条件,但这也意味着大量的进程间调用,这是昂贵的。因此,如果将有大量与共享项的通信,则非托管可能会更快(尽管更危险)。 (7认同)

Zol*_* K. 6

我最近遇到了一个问题Manager().Queue(),当SyncManager返回的对象multiprocessing.Manager()似乎死亡时,它管理的队列永远阻塞(即使使用*_nowait())。

我不确定原因,或者如果 SyncManager 真的死了,我唯一的线索是我multiprocessing.Manager()从一个类实例调用,该实例具有__del__(),它记录了调用它的进程,我可以看到这是__del__()从同步管理器进程。

这意味着我的对象在 SyncManager 进程中有一个副本,并且它被垃圾收集了。这可能意味着只有我的对象被删除,并且 SyncManager 很好,但我确实看到相应的队列变得无响应与__del__()SyncManager 进程中的调用相关。

我不知道我的对象如何最终进入 SyncManager 进程。我通常会抽出 50-200 名经理,其中一些具有重叠的生命周期,另一些则没有 - 直到我看到这个问题。对于解释器退出时存在的对象,__del__()不会被调用,并且我通常不会看到 SyncManager 对象因该日志而死亡__del__(),只是偶尔。可能当出现问题时,SyncManager 对象首先处理其对象,然后解释器才会退出,这就是为什么我__del__()有时会看到该调用。

我确实看到我的队列变得无响应,即使在我没有看到__del__()从 SyncManager 调用的情况下也是如此。

我还看到 SyncManager“死掉”而没有造成进一步的问题。

我所说的“无反应”是指:

queue.get(timeout=1)
queue.put(timeout=1)
Run Code Online (Sandbox Code Playgroud)

永远不会回来。

queue.get_nowait(timeout=1)
queue.put_nowait(timeout=1)
Run Code Online (Sandbox Code Playgroud)

永远不会回来。

这变得有点复杂,然后我本来想要的,但我把细节放进去,以防万一它对某人有帮助。

Manager().Queue()之前用过很长一段时间没有任何问题。我怀疑要么实例化大量管理器对象导致了问题,要么实例化大量管理器导致了一直存在的表面问题。

我用Python 3.6.5

  • 我仍在学习多重处理的所有复杂性,所以我不是 100% 确定,但[与 `__del__` 相关的问题可能与此处解释的类似](https://codewithoutrules.com/2017/08/16 /并发-python/)。据该帖子称,他们在“3.7”中修复了其中的一些问题。 (2认同)