多处理错误的管道错误.Queue

hAc*_*oCk 7 python parallel-processing multiprocessing python-2.7

在python2.7中,multiprocessing.Queue在从函数内部初始化时抛出一个破坏的错误.我提供了一个重现问题的最小例子.

#!/usr/bin/python
# -*- coding: utf-8 -*-

import multiprocessing

def main():
    q = multiprocessing.Queue()
    for i in range(10):
        q.put(i)

if __name__ == "__main__":
    main()
Run Code Online (Sandbox Code Playgroud)

抛出下面破裂的管道错误

Traceback (most recent call last):
File "/usr/lib64/python2.7/multiprocessing/queues.py", line 268, in _feed
send(obj)
IOError: [Errno 32] Broken pipe

Process finished with exit code 0
Run Code Online (Sandbox Code Playgroud)

我无法破译原因.我们无法从函数内部填充Queue对象,这当然很奇怪.

小智 11

当您启动Queue.put()时,将启动隐式线程以将数据传递到队列.同时,主应用程序完成,数据没有结束站(队列对象被垃圾收集).

我会试试这个:

from multiprocessing import Queue

def main():
    q = Queue()
    for i in range(10):
        print i
        q.put(i)
    q.close()
    q.join_thread()

if __name__ == "__main__":
    main()
Run Code Online (Sandbox Code Playgroud)

join_thread()确保缓冲区中的所有数据都已刷新.close()必须先叫join_thread()


CoM*_*tel 7

这里发生的是,当您调用时main(),它将创建Queue,在其中放置10个对象并结束该函数,并垃圾回收其所有内部变量和对象,包括Queue。但您会收到此错误,因为您仍在尝试发送中的最后一个号码Queue

从文档文档中

“当进程首先将项目放入队列时,将启动一个供料器线程,该线程将对象从缓冲区转移到管道中。”

正如put()在另一个线程中所做的那样,它不会阻止脚本的执行,并允许main()在完成Queue操作之前结束该函数。

尝试这个 :

#!/usr/bin/python
# -*- coding: utf-8 -*-

import multiprocessing
import time
def main():
    q = multiprocessing.Queue()
    for i in range(10):
        print i
        q.put(i)
    time.sleep(0.1) # Just enough to let the Queue finish

if __name__ == "__main__":
    main()
Run Code Online (Sandbox Code Playgroud)

join将对象放入之前,应该有一种方法来执行队列或块执行Queue,您应该查看文档。