Fel*_*lix 2 python queue multiprocessing dataframe python-multiprocessing
我正在使用multiprocessingpython模块.我有大约20-25个任务同时运行.每个任务将创建一个pandas.DataFrame~20k行的对象.问题是,所有任务都执行得很好,但是当谈到"加入"流程时,它就会停止.我尝试过"小"DataFrames并且效果很好.为了说明我的观点,我在下面创建了代码.
import pandas
import multiprocessing as mp
def task(arg, queue):
DF = pandas.DataFrame({"hello":range(10)}) # try range(1000) or range(10000)
queue.put(DF)
print("DF %d stored" %arg)
listArgs = range(20)
queue = mp.Queue()
processes = [mp.Process(target=task,args=(arg,queue)) for arg in listArgs]
for p in processes:
p.start()
for i,p in enumerate(processes):
print("joining %d" %i)
p.join()
results = [queue.get() for p in processes]
Run Code Online (Sandbox Code Playgroud)
编辑:
使用DF = pandas.DataFrame({"hello":range(10)})我有一切正确:"DF 0存储"直到"DF 19存储",与"加入0"到"加入19"相同.
然而,DF = pandas.DataFrame({"hello":range(1000)})问题出现了:当它存储DF时,加入步骤在"加入3"之后停止.
感谢有用的提示:)
在管道和队列下的文档中解释了此问题:
警告:如上所述,如果子进程已将项目放入队列(并且尚未使用
JoinableQueue.cancel_join_thread),则在将所有已缓冲的项目刷新到管道之前,该进程不会终止.这意味着,如果您尝试加入该进程,则可能会遇到死锁,除非您确定已经使用了已放入队列的所有项目.类似地,如果子进程是非守护进程,则父进程在尝试加入其所有非守护进程子进程时可能会在退出时挂起.
请注意,使用管理器创建的队列没有此问题.请参阅编程指南.
使用经理可以工作,但有很多更简单的方法来解决这个问题:
Queue手动管理(例如,使用JoinableQueue和task_done).Pool.map而不是重新发明轮子.(是的,Pool对于您的用例来说,大部分内容都不是必需的 - 但它也不会妨碍,而且好处是,您已经知道它有效.)我不会展示#1的实现,因为它是如此微不足道,或者#2因为它是如此痛苦,但对于#3:
def task(arg):
DF = pandas.DataFrame({"hello":range(1000)}) # try range(1000) or range(10000)
return DF
with mp.Pool(processes=20) as p:
results = p.map(task, range(20), chunksize=1)
Run Code Online (Sandbox Code Playgroud)
(在2.7中,Pool可能无法在with语句中工作;您可以将更高版本的端口安装multiprocessing回到2.7关闭PyPI,或者您可以手动创建池,然后close在try/中finally,只需要处理文件,如果它没有在with声明中工作...)
您可能会问自己,为什么它在这一点上确实失败了,但是使用较小的数字 - 甚至只是更小一些?
这个DataFrame的泡菜刚刚超过16K.(列表本身稍微小一些,但是如果用10000而不是1000来尝试它,你应该在没有Pandas的情况下看到相同的东西.)
所以,第一个孩子写16K,然后阻塞,直到有空间写最后几百个字节.但是你没有从管道上取下任何东西(通过调用queue.get)直到它之后join,你不能join直到它们退出,直到你解开管道它们才能做到,所以这是一个典型的死锁.有足够的空间让前4个通过,但没有5个空间.因为你有4个核心,大部分时间,前4个通过将是第4个.但偶尔#4将击败#3或者其他东西,然后你将无法加入#3.使用8核机器会更频繁地发生这种情况.
| 归档时间: |
|
| 查看次数: |
1931 次 |
| 最近记录: |