hAc*_*oCk 7 python parallel-processing multiprocessing python-2.7
在python2.7中,multiprocessing.Queue在从函数内部初始化时抛出一个破坏的错误.我提供了一个重现问题的最小例子.
#!/usr/bin/python
# -*- coding: utf-8 -*-
import multiprocessing
def main():
q = multiprocessing.Queue()
for i in range(10):
q.put(i)
if __name__ == "__main__":
main()
Run Code Online (Sandbox Code Playgroud)
抛出下面破裂的管道错误
Traceback (most recent call last):
File "/usr/lib64/python2.7/multiprocessing/queues.py", line 268, in _feed
send(obj)
IOError: [Errno 32] Broken pipe
Process finished with exit code 0
Run Code Online (Sandbox Code Playgroud)
我无法破译原因.我们无法从函数内部填充Queue对象,这当然很奇怪.
小智 11
当您启动Queue.put()时,将启动隐式线程以将数据传递到队列.同时,主应用程序完成,数据没有结束站(队列对象被垃圾收集).
我会试试这个:
from multiprocessing import Queue
def main():
q = Queue()
for i in range(10):
print i
q.put(i)
q.close()
q.join_thread()
if __name__ == "__main__":
main()
Run Code Online (Sandbox Code Playgroud)
join_thread()确保缓冲区中的所有数据都已刷新.close()必须先叫join_thread()
这里发生的是,当您调用时main(),它将创建Queue,在其中放置10个对象并结束该函数,并垃圾回收其所有内部变量和对象,包括Queue。但您会收到此错误,因为您仍在尝试发送中的最后一个号码Queue。
从文档文档中:
“当进程首先将项目放入队列时,将启动一个供料器线程,该线程将对象从缓冲区转移到管道中。”
正如put()在另一个线程中所做的那样,它不会阻止脚本的执行,并允许main()在完成Queue操作之前结束该函数。
尝试这个 :
#!/usr/bin/python
# -*- coding: utf-8 -*-
import multiprocessing
import time
def main():
q = multiprocessing.Queue()
for i in range(10):
print i
q.put(i)
time.sleep(0.1) # Just enough to let the Queue finish
if __name__ == "__main__":
main()
Run Code Online (Sandbox Code Playgroud)
在join将对象放入之前,应该有一种方法来执行队列或块执行Queue,您应该查看文档。