Python多处理队列:接收进程退出时该怎么办?

fel*_*pou 4 python queue multiprocessing

基本上我有以下代码:

import multiprocessing
import time

class MyProcess(multiprocessing.Process):

    def __init__(self, ):
        multiprocessing.Process.__init__(self)
        self.queue = multiprocessing.Queue()

    def run(self):
        print "Subprocess starting!"
        time.sleep(4)
        print "Subprocess exiting!"

    def addToQueue(self):
        starttime = time.time()
        count=0
        print "Adding stuff to queue..."
        while time.time()-starttime  < 4:
            self.queue.put("string")
            count += 1
        print "Added %d objects!" % count

        #self.queue.close()


if __name__ == "__main__":
    process = MyProcess()
    process.start()
    print "Waiting for a while"
    time.sleep(2)
    process.addToQueue()
    time.sleep(1)
    print "Child process state: %d" % process.is_alive()
Run Code Online (Sandbox Code Playgroud)

主进程完成后,它不会退出.什么都没发生,它只是阻止.我发现戒烟的唯一方法是杀死它(不是SIGTERM,SIGKILL).

如果我使用该注释行,它会退出但发出IOError.

我查看了multiprocessing.queue的代码,它使用了另一个线程(threading.Thread)中生成的os.pipe().我怀疑是线程在写入管道时阻塞,并且当使用close()方法时,它会引发IOError.

所以我的问题是:有更清洁的方法来处理这个问题吗?

我的意思是,我有这种情况,一直在写一个队列.当接收进程退出(干净或不干净)时,我应该关闭队列并在发送方进程上获得IOError?

编辑:进程的输出

Waiting for a while
Subprocess starting!
Adding stuff to queue...
Subprocess exiting!
Added 1822174 objects!
Traceback (most recent call last):
  File "/usr/lib/python2.7/multiprocessing/queues.py", line 266, in _feed
    send(obj)
IOError: [Errno 32] Broken pipe
Child process state: 0
Run Code Online (Sandbox Code Playgroud)

此部分仅在使用注释的self.queue.close()时发生:

Traceback (most recent call last):
  File "/usr/lib/python2.7/multiprocessing/queues.py", line 266, in _feed
    send(obj)
IOError: [Errno 32] Broken pipe
Run Code Online (Sandbox Code Playgroud)

fel*_*pou 10

我正在回答我自己的问题,因为不是每个人都会阅读评论.在评论中来自用户mata的提示后,我测试了问题中的示例代码,在time.sleep(0.01)循环内部添加了一个调用,将对象添加到队列中,因此我可以限制将添加到队列中的对象数量:

def addToQueue(self):
        starttime = time.time()
        count=0
        print "Adding stuff to queue..."
        while time.time()-starttime  < 4:
            self.queue.put("string")
            count += 1
            time.sleep(0.01)
        print "Added %d objects!" % count
Run Code Online (Sandbox Code Playgroud)

因此,当对象数量较少(在此示例中小于3800)时,该过程正常退出.但是当有很多对象时,进程之间的管道似乎有一些锁定.

但这对我提出了另一个问题:这是一个错误吗?我应该报告吗?或者这只是正常的预期行为?

非常感谢用户mata指出这种可能性!