python多处理 - 进程挂起大型队列的连接

use*_*424 23 python queue process multiprocessing

我正在运行python 2.7.3,我注意到以下奇怪的行为.考虑这个最小的例子:

from multiprocessing import Process, Queue

def foo(qin, qout):
    while True:
        bar = qin.get()
        if bar is None:
            break
        qout.put({'bar': bar})

if __name__ == '__main__':
    import sys

    qin = Queue()
    qout = Queue()
    worker = Process(target=foo,args=(qin,qout))
    worker.start()

    for i in range(100000):
        print i
        sys.stdout.flush()
        qin.put(i**2)

    qin.put(None)
    worker.join()
Run Code Online (Sandbox Code Playgroud)

当我循环超过10,000或更多时,我的脚本会挂起worker.join().当循环仅达到1,000时,它工作正常.

有任何想法吗?

Arm*_*igo 34

qout子进程中的队列已满.您放入其中的数据foo()不适合内部使用的操作系统管道的缓冲区,因此子进程阻止尝试适应更多数据.但是父进程没有读取这些数据:它也被简单地阻塞,等待子进程完成.这是典型的死锁.

  • 如果您还提供了问题的代码解决方案,那就太棒了.即如何清空缓冲区,以便子进程不会阻塞. (13认同)

amd*_*amd 5

队列的大小必须有限制。考虑以下修改:

from multiprocessing import Process, Queue

def foo(qin,qout):
    while True:
        bar = qin.get()
        if bar is None:
            break
        #qout.put({'bar':bar})

if __name__=='__main__':
    import sys

    qin=Queue()
    qout=Queue()   ## POSITION 1
    for i in range(100):
        #qout=Queue()   ## POSITION 2
        worker=Process(target=foo,args=(qin,))
        worker.start()
        for j in range(1000):
            x=i*100+j
            print x
            sys.stdout.flush()
            qin.put(x**2)

        qin.put(None)
        worker.join()

    print 'Done!'
Run Code Online (Sandbox Code Playgroud)

这按原样工作(qout.put注释掉了行)。如果您尝试保存所有 100000 个结果,则qout变得太大:如果我取消注释qout.put({'bar':bar})in foo,并保留qoutPOSITION 1 中的定义,则代码挂起。但是,如果我将qout定义移至 POSITION 2,则脚本完成。

所以简而言之,你必须小心,qin不要也不要qout变得太大。(另请参阅:多处理队列最大大小限制为 32767


Rug*_*nar 5

python3在尝试将字符串放入总大小约为 5000 cahrs 的队列时遇到了同样的问题。

在我的项目中有一个主机进程,它设置一个队列并启动子进程,然后加入。之后join主机进程从队列中读取。当子进程产生太多数据时,主机挂在join. 我使用以下函数修复了这个问题以等待宿主进程中的子进程:

from multiprocessing import Process, Queue
from queue import Empty

def yield_from_process(q: Queue, p: Process):
    while p.is_alive():
        p.join(timeout=1)
        while True:
            try:
                yield q.get(block=False)
            except Empty:
                break
Run Code Online (Sandbox Code Playgroud)

我在队列填满后立即从队列中读取,因此它永远不会变得非常大