我对python很新.我正在使用多处理模块读取stdin上的文本行,以某种方式转换它们并将它们写入数据库.这是我的代码片段:
batch = []
pool = multiprocessing.Pool(20)
i = 0
for i, content in enumerate(sys.stdin):
batch.append(content)
if len(batch) >= 10000:
pool.apply_async(insert, args=(batch,i+1))
batch = []
pool.apply_async(insert, args=(batch,i))
pool.close()
pool.join()
Run Code Online (Sandbox Code Playgroud)
现在一切正常,直到我处理我输入我的python程序的巨大输入文件(数亿行).在某些时候,当我的数据库变慢时,我看到内存已满.
经过一番播放后,事实证明pool.apply_async以及pool.map_async永远不会阻塞,因此要处理的调用队列越来越大.
我的问题的正确方法是什么?我希望我能设置一个参数,一旦达到某个队列长度,就会阻塞pool.apply_async调用.Java中的AFAIR可以为ThreadPoolExecutor提供一个具有固定长度的BlockingQueue用于此目的.
谢谢!
python queue design-patterns multiprocessing python-multiprocessing