eri*_*rik 3 python multiprocessing
当试图把一个大ndarray的Queue放入一个Process,我遇到以下问题:
首先,这是代码:
import numpy
import multiprocessing
from ctypes import c_bool
import time
def run(acquisition_running, data_queue):
while acquisition_running.value:
length = 65536
data = numpy.ndarray(length, dtype='float')
data_queue.put(data)
time.sleep(0.1)
if __name__ == '__main__':
acquisition_running = multiprocessing.Value(c_bool)
data_queue = multiprocessing.Queue()
process = multiprocessing.Process(
target=run, args=(acquisition_running, data_queue))
acquisition_running.value = True
process.start()
time.sleep(1)
acquisition_running.value = False
process.join()
print('Finished')
number_items = 0
while not data_queue.empty():
data_item = data_queue.get()
number_items += 1
print(number_items)
Run Code Online (Sandbox Code Playgroud)
如果我使用length=10左右,一切正常.我通过队列传输了9个项目.
如果我length=1000在我的计算机上增加process.join()块,虽然功能run()已经完成.我可以评论该行,process.join()并将看到,在队列中只放置了2个项目,因此显然将数据放入队列变得非常慢.
我的计划实际上是传输4个ndarray,每个长度为65536.因为Thread这个工作非常快(<1ms).有没有办法提高流程传输数据的速度?
我在Windows机器上使用Python 3.4,但在Linux上使用Python 3.4我得到了相同的行为.
"有没有办法提高流程传输数据的速度?"
当然,给予正确的解决问题.目前,您只需填充缓冲区而不会同时清空缓冲区.恭喜你,你刚刚建立了一个所谓的死锁.文档中的相应引用是:
请记住,将项目放入队列的进程将在终止之前等待,直到所有缓冲的项目由"feeder"线程提供给底层管道.
但是,让我们慢慢接近这个.首先,"速度"不是你的问题!我知道你只是在试验Python multiprocessing.阅读代码时最重要的见解是父母和孩子之间的沟通流程,特别是事件处理并没有多大意义.如果你有一个你想要解决的现实世界的问题,你肯定无法通过这种方式解决它.如果你没有现实问题,那么在开始编写代码之前,首先需要提出一个好问题;-).最后,您需要了解操作系统为进程间通信提供的通信原语.
您正在观察的内容的说明:
你的孩子的过程产生关于10 * length * size(float)数据的字节数(考虑,虽然你的父母睡约1秒将其设置之前,你的孩子进程可以执行大约10次迭代的事实acquisition_running来False).当您的父进程休眠时,子进程将命名的数据量放入队列中.您需要了解队列是一个复杂的构造.你不需要了解它的每一点.但有一件事确实非常重要:进程间通信的队列显然使用了父进程和子进程之间的某种缓冲区*.缓冲区通常具有有限的尺寸.您正在从子内部写入此缓冲区,而不是同时从父级中读取它.也就是说,缓冲内容在父母刚睡觉时稳定增长.通过增加length您遇到队列缓冲区已满并且子进程无法再写入它的情况.但是,子进程在写入所有数据之前无法终止.同时,父进程等待子进程终止.
你看?一个实体等待另一个实体.父母等待孩子终止,孩子等待父母留出一些空间.这种情况称为死锁.它无法解决自己.
关于细节,缓冲器情况比上述情况稍微复杂一些.您的子进程产生了一个额外的线程,它试图通过管道将缓冲的数据推送到父级.实际上,这个管道的缓冲区是限制实体.它由操作系统定义,至少在Linux上,通常不大于65536字节.
换句话说,基本部分是:在子项完成尝试写入管道之前,父项不会从管道中读取.在使用管道的每个有意义的场景中,读取和写入以相当同时的方式发生,以便一个过程可以快速响应另一个过程提供的输入.你做的恰恰相反:你让你的父母睡觉,因此让它对孩子的输入不敏感,导致死锁情况.
(*)" 当一个进程首先将一个项目放入队列时,就会启动一个从缓冲区将对象传输到管道中的支线线程 ",来自https://docs.python.org/2/library/multiprocessing.html