如何在Python中并行化生成器/迭代器管道?

Rya*_*son 7 python parallel-processing iterator pipeline

假设我有一些Python代码,如下所示:

input = open("input.txt")
x = (process_line(line) for line in input)
y = (process_item(item) for item in x)
z = (generate_output_line(item) + "\n" for item in y)
output = open("output.txt", "w")
output.writelines(z)
Run Code Online (Sandbox Code Playgroud)

此代码从输入文件中读取每一行,通过多个函数运行它,并将输出写入输出文件.现在知道函数process_line,process_item并且generate_output_line永远不会相互干扰,让我们假设输入和输出文件位于不同的磁盘上,这样读写就不会相互干扰.

但Python可能不知道这些.我的理解是Python将读取一行,依次应用每个函数,并将结果写入输出,然后只有将第一行发送到输出才会读取第二行,这样第二行就不会进入管道直到第一个退出.我是否正确理解该程序将如何流动?如果这是它的工作方式,是否有任何简单的方法可以使多个行同时在管道中,以便程序并行读取,写入和处理每个步骤?

Sam*_*ain 5

您无法真正并行化读取或写入文件; 这些将是你的瓶颈,最终.你确定你的瓶颈是CPU,而不是I/O吗?

由于您的处理不包含依赖项(根据您的意愿),因此使用Python的multiprocessing.Pool类非常简单.

有几种方法可以写这个,但更简单的wrt调试是找到独立的关键路径(代码的最慢部分),我们将使它们并行运行.我们假设它是process_item.

......实际上就是这样.码:

import multiprocessing.Pool

p = multiprocessing.Pool() # use all available CPUs

input = open("input.txt")
x = (process_line(line) for line in input)
y = p.imap(process_item, x)
z = (generate_output_line(item) + "\n" for item in y)
output = open("output.txt", "w")
output.writelines(z)
Run Code Online (Sandbox Code Playgroud)

我没有测试过,但这是基本的想法.Pool的imap方法确保以正确的顺序返回结果.