Rya*_*son 7 python parallel-processing iterator pipeline
假设我有一些Python代码,如下所示:
input = open("input.txt")
x = (process_line(line) for line in input)
y = (process_item(item) for item in x)
z = (generate_output_line(item) + "\n" for item in y)
output = open("output.txt", "w")
output.writelines(z)
Run Code Online (Sandbox Code Playgroud)
此代码从输入文件中读取每一行,通过多个函数运行它,并将输出写入输出文件.现在我知道函数process_line
,process_item
并且generate_output_line
永远不会相互干扰,让我们假设输入和输出文件位于不同的磁盘上,这样读写就不会相互干扰.
但Python可能不知道这些.我的理解是Python将读取一行,依次应用每个函数,并将结果写入输出,然后只有在将第一行发送到输出后才会读取第二行,这样第二行就不会进入管道直到第一个退出.我是否正确理解该程序将如何流动?如果这是它的工作方式,是否有任何简单的方法可以使多个行同时在管道中,以便程序并行读取,写入和处理每个步骤?
您无法真正并行化读取或写入文件; 这些将是你的瓶颈,最终.你确定你的瓶颈是CPU,而不是I/O吗?
由于您的处理不包含依赖项(根据您的意愿),因此使用Python的multiprocessing.Pool类非常简单.
有几种方法可以写这个,但更简单的wrt调试是找到独立的关键路径(代码的最慢部分),我们将使它们并行运行.我们假设它是process_item.
......实际上就是这样.码:
import multiprocessing.Pool
p = multiprocessing.Pool() # use all available CPUs
input = open("input.txt")
x = (process_line(line) for line in input)
y = p.imap(process_item, x)
z = (generate_output_line(item) + "\n" for item in y)
output = open("output.txt", "w")
output.writelines(z)
Run Code Online (Sandbox Code Playgroud)
我没有测试过,但这是基本的想法.Pool的imap方法确保以正确的顺序返回结果.