python 使用多进程过滤海量文件

sim*_*ack 5 python multiprocessing

我正在尝试并行化文件过滤操作,其中每个过滤器都是一个大的正则表达式,因此整个操作需要时间来运行。文件本身大约有 100GB。单进程版本如下所示:

def func(line):
    # simple function as an example
    for i in range(10**7):
        pass
    return len(line) % 2 == 0


with open('input.txt') as in_sr, open('output.txt', 'w') as out_sr:
    for line in input:
        if func(line):
            out_sr.write(line)
Run Code Online (Sandbox Code Playgroud)

我尝试使用multiprocessing's ,imap但这让ValueError: I/O operation on closed file.我认为迭代器被复制到每个进程,但并非所有进程都打开该句柄。

有没有办法使用multiprocessing(最好是使用池)来做到这一点?

Jon*_*Jon 3

我可以运行以下代码而不会出现错误。确保您没有在声明之外in_sr打电话。out_srwith

from multiprocessing import Pool

def func(line):
    # simple function as an example
    for i in xrange(10**7):
        pass
    return len(line) % 2 == 0, line

def main():
    with open('input.txt','r') as in_sr, open('output.txt', 'w') as out_sr:
        pool = Pool(processes=4)
        for ret,line in pool.imap(func, in_sr, chunksize=4):
            if ret:
                out_sr.write(line)
        pool.close()

if __name__ == '__main__':
    main()
Run Code Online (Sandbox Code Playgroud)