子进程完成但仍未终止,从而导致死锁

tjm*_*tjm 6 python debugging deadlock multiprocessing python-3.x

好吧,因为目前没有答案,我不觉得这样做太糟糕了.虽然我仍然对幕后实际发生的问题感兴趣,但我最迫切的问题是更新2中指定的问题.

a JoinableQueue和a 之间有什么区别Manager().Queue()(什么时候应该使用另一个?).更重要的是,在这个例子中,替换另一个是否安全?


在下面的代码中,我有一个简单的进程池.每个进程都传递进程queue(pq)以从中提取要处理的数据,并返回值queue(rq)以将返回的处理值传递回主线程.如果我不附加到返回值队列,它会起作用,但是一旦我这样做,由于某种原因阻止进程停止.在这两种情况下,进程run方法都返回,因此它不在put返回队列阻塞上,但在第二种情况下进程本身不会终止,因此程序join在进程时死锁.为什么会这样?

更新:

  1. 它似乎与队列中的项目数有关.

    至少在我的机器上,我可以在队列中拥有多达6570个项目,它实际上可以工作,但不仅仅是这个而且它会死锁.

  2. 它似乎合作Manager().Queue().

    无论是限制JoinableQueue还是仅仅误解了两个对象之间的差异,我发现如果我用a替换返回队列Manager().Queue(),它会按预期工作.它们之间有什么区别,什么时候应该使用另一个?

  3. 如果我从rq

    Oop 消费,则不会发生错误.这里有一个答案,当我评论它时,它就消失了.无论如何,它所说的一件事就是质疑,如果我添加一个消费者,这个错误是否仍会发生.我试过这个,答案是,不,不是.

    它提到的另一件事是来自多处理文档的引用作为问题的可能关键.提到JoinableQueue的,它说:

    ...用于计算未完成任务数量的信号量最终可能会溢出引发异常.


import multiprocessing

class _ProcSTOP:
    pass

class Proc(multiprocessing.Process):

    def __init__(self, pq, rq):
        self._pq = pq
        self._rq = rq
        super().__init__()
        print('++', self.name)

    def run(self):
        dat = self._pq.get()

        while not dat is _ProcSTOP:
#            self._rq.put(dat)        # uncomment me for deadlock
            self._pq.task_done()
            dat = self._pq.get()

        self._pq.task_done() 
        print('==', self.name)

    def __del__(self):
        print('--', self.name)

if __name__ == '__main__':

    pq = multiprocessing.JoinableQueue()
    rq = multiprocessing.JoinableQueue()
    pool = []

    for i in range(4):
        p = Proc(pq, rq) 
        p.start()
        pool.append(p)

    for i in range(10000):
        pq.put(i)

    pq.join()

    for i in range(4):
       pq.put(_ProcSTOP)

    pq.join()

    while len(pool) > 0:
        print('??', pool)
        pool.pop().join()    # hangs here (if using rq)

    print('** complete')
Run Code Online (Sandbox Code Playgroud)

示例输出,不使用返回队列:

++ Proc-1
++ Proc-2
++ Proc-3
++ Proc-4
== Proc-4
== Proc-3
== Proc-1
?? [<Proc(Proc-1, started)>, <Proc(Proc-2, started)>, <Proc(Proc-3, started)>, <Proc(Proc-4, started)>]
== Proc-2
?? [<Proc(Proc-1, stopped)>, <Proc(Proc-2, started)>, <Proc(Proc-3, stopped)>]
-- Proc-3
?? [<Proc(Proc-1, stopped)>, <Proc(Proc-2, started)>]
-- Proc-2
?? [<Proc(Proc-1, stopped)>]
-- Proc-1
** complete
-- Proc-4
Run Code Online (Sandbox Code Playgroud)

样本输出,使用返回队列:

++ Proc-1
++ Proc-2
++ Proc-3
++ Proc-4
== Proc-2
== Proc-4
== Proc-1
?? [<Proc(Proc-1, started)>, <Proc(Proc-2, started)>, <Proc(Proc-3, started)>, <Proc(Proc-4, started)>]
== Proc-3
# here it hangs
Run Code Online (Sandbox Code Playgroud)

Mih*_*tan 0

文档中:

警告

如上所述,如果子进程已将项目放入队列(并且尚未使用 JoinableQueue.cancel_join_thread()),则该进程将不会终止,直到所有缓冲的项目都已刷新到管道。

这意味着,如果您尝试加入该进程,则可能会遇到死锁,除非您确定已放入队列的所有项目都已被消耗。类似地,如果子进程是非守护进程,则当父进程尝试加入其所有非守护进程子进程时,它可能会在退出时挂起。

请注意,使用管理器创建的队列不存在此问题。请参阅编程指南。

因此,JoinableQueue() 使用管道,并会等到它可以刷新所有数据后再关闭。

另一方面,Manager.Queue() 对象使用完全不同的方法。管理人员正在运行一个单独的进程,该进程立即接收所有数据(并将其存储在内存中)。

管理器提供了一种创建可在不同流程之间共享的数据的方法。管理器对象控制管理共享对象的服务器进程。其他进程可以使用代理访问共享对象。

...

Queue([maxsize]) 创建一个共享的 Queue.Queue 对象并为其返回一个代理。