加入多处理队列需要很长时间

Dsc*_*oni 5 python queue python-multiprocessing

在 Python 2.7 中,我实现了一个具有多个队列和消费者的多处理场景。简化的想法是,我有一个作业的生产者,这些作业被提供给消费者,处理作业和一个错误处理程序,它负责所有的日志记录。非常简化,看起来都可以与之媲美:

import multiprocessing as mp
import Queue

job_queue = mp.Queue()
error_queue = mp.Queue()
for i in range(10):
    job_queue.put(i)

def job_handler(job_queue, error_queue):
    print 'Job handler'
    while True:
        try: 
            element = job_queue.get_nowait()
            print element
        except:
# t1
            error_queue.put('Error')
            error_queue.close()
            error_queue.join_thread()
            job_queue.close()
            job_queue.join_thread()
# t2
            return 1

def error_handler(error_queue):
    result = error_queue.get()
    if result == 'Error':
        error_queue.close()
        error_queue.join_thread()

if __name__ == '__main__':
    print 'Starting'
    p1 = mp.Process(target = error_handler, args = (error_queue, ))
    p1.start()
    p2 = mp.Process(target = job_handler, args = (job_queue, error_queue))
    p2.start()
Run Code Online (Sandbox Code Playgroud)

这基本上有效,但在我更复杂的程序中,两个评论点t1t2(大约 5 分钟)之间存在很长的时间差。所以我有两个问题:

  1. 难道我的理解是正确的,每一道工序应该调用close()join_thread()对所有使用队列对象,以表明它使用它们做了什么?我认为,当我结束它们时,子流程会隐含地这样做,例如通过返回此处所述

join_thread()加入后台线程。这只能在调用 close() 后使用。它会阻塞直到后台线程退出,确保缓冲区中的所有数据都已刷新到管道中。

默认情况下,如果进程不是队列的创建者,那么在退出时它将尝试加入队列的后台线程。进程可以调用cancel_join_thread() 使join_thread() 什么也不做。

  1. 我怎样才能弄清楚为什么加入过程需要这么长时间?

Bha*_*rel 2

致电.close().join_thread()是建议,但不是必须的。.close()当队列被垃圾回收时自动调用,并且.join_thread()在进程终止时自动调用。

不幸的是,我运行了你的代码,并在 5 秒后得到了一个漂亮的终止结果,并打印了 0-9。即使当我推送一个不可打印的字符时,我也没有收到任何延迟。该代码似乎运行流畅。

对于更复杂的程序,如果您通过队列传递大量数据,则可能会发生这种情况。队列用作IPC,这意味着数据在一侧编码,推入管道,并在另一侧解码。传递大量数据会导致速度减慢。由于它最终会自行解决,因此看起来并不像僵局。

尽管最好避免这种情况,但一个选择是使用共享内存而不是队列。这样,数据不需要在进程之间传递,而只需保留在两个进程共享的一个内存段中。