Python 3在队列为空之前调用join时,多处理队列死锁

mar*_*rkk 20 python multithreading

我有一个问题,了解multiprocessingpython 3中模块中的队列

这就是他们在编程指南中所说的:

请记住,将项目放入队列的进程将在终止之前等待,直到所有缓冲的项目由"feeder"线程提供给底层管道.(子进程可以调用队列的Queue.cancel_join_thread方法来避免此行为.)

这意味着无论何时使用队列,您都需要确保在加入进程之前最终删除已放入队列的所有项目.否则,您无法确定已将项目放入队列的进程将终止.还要记住,非守护进程会自动加入.

将导致死锁的示例如下:

from multiprocessing import Process, Queue

def f(q):
    q.put('X' * 1000000)

if __name__ == '__main__':
    queue = Queue()
    p = Process(target=f, args=(queue,))
    p.start()
    p.join()                    # this deadlocks
    obj = queue.get()

这里的一个修复是交换最后两行(或简单地删除p.join()行).

所以显然,queue.get()不应该在之后调用join().

但是,有一些使用队列的例子,getjoin类似之后调用:

import multiprocessing as mp
import random
import string

# define a example function
def rand_string(length, output):
    """ Generates a random string of numbers, lower- and uppercase chars. """
    rand_str = ''.join(random.choice(
                string.ascii_lowercase
                + string.ascii_uppercase
                + string.digits)
    for i in range(length))
        output.put(rand_str)

 if __name__ == "__main__":
     # Define an output queue
     output = mp.Queue()

     # Setup a list of processes that we want to run
     processes = [mp.Process(target=rand_string, args=(5, output))
                    for x in range(2)]

     # Run processes
    for p in processes:
        p.start()

    # Exit the completed processes
    for p in processes:
        p.join()

    # Get process results from the output queue
    results = [output.get() for p in processes]

    print(results)
Run Code Online (Sandbox Code Playgroud)

我运行这个程序,它的工作原理(也作为StackOverFlow问题的解决方案发布Python 3 - 多处理 - Queue.get()没有响应).

有人可以帮我理解僵局的规则是什么吗?

Pat*_*pin 35

允许在进程之间传输数据的多处理中的队列实现依赖于标准OS管道.

操作系统管道不是无限长,因此在put()操作期间可以在操作系统中阻止排队数据的过程,直到某个其他进程用于get()从队列中检索数据.

对于少量数据,例如示例中的数据,主进程可以join()生成所有生成的子进程,然后获取数据.这通常效果很好,但不能扩展,并且不清楚何时会破坏.

但它肯定会打破大量数据.子put()进程将被阻塞,等待主进程从队列中删除一些数据get(),但主进程在join()等待子进程完成时被阻止.这导致死锁.

以下是用户遇到此问题的示例.我在答案中发布了一些代码,帮助他解决了问题.


Ale*_*din 6

join()在从共享队列中获取所有消息之前,不要调用进程对象。

我使用以下解决方法允许进程在处理所有结果之前退出:

results = []
while True:
    try:
        result = resultQueue.get(False, 0.01)
        results.append(result)
    except queue.Empty:
        pass
    allExited = True
    for t in processes:
        if t.exitcode is None:
            allExited = False
            break
    if allExited & resultQueue.empty():
        break
Run Code Online (Sandbox Code Playgroud)

它可以缩短,但我把它留得更久,以便新手更清楚。

resultQueuemultiprocess.Queuemultiprocess.Process对象共享的。在此代码块之后,您将获得result包含队列中所有消息的数组。

问题是接收消息的队列管道的输入缓冲区可能会变满,导致写入者无限阻塞,直到有足够的空间来接收下一条消息。所以你有三种方法可以避免阻塞:

  • 增加multiprocessing.connection.BUFFER尺寸(不太好)
  • 减少消息大小或其数量(不太好)
  • 收到消息时立即从队列中获取消息(好方法)