从大文件中分块数据进行多处理?

use*_*625 16 python parallel-processing

我正在尝试使用多处理并行化应用程序,该应用程序接收一个非常大的csv文件(64MB到500MB),一些工作逐行,然后输出一个小的,固定大小的文件.

目前我做了一个list(file_obj),不幸的是,它被完全加载到内存中(我认为)然后我将该列表分成n个部分,n是我想要运行的进程数.然后我pool.map()在破碎的清单上做了一个.

与单线程,只是打开文件和迭代的方法相比,这似乎有一个非常非常糟糕的运行时.有人可以建议更好的解决方案?

另外,我需要以组的形式处理文件的行,这些行保留了某个列的值.这些行组本身可以拆分,但任何组都不应包含此列的多个值.

unu*_*tbu 16

list(file_obj)很大时可能需要大量内存fileobj.我们可以通过使用itertools来根据需要提取行块来减少内存需求.

特别是我们可以使用

reader = csv.reader(f)
chunks = itertools.groupby(reader, keyfunc)
Run Code Online (Sandbox Code Playgroud)

将文件拆分为可处理的块,以及

groups = [list(chunk) for key, chunk in itertools.islice(chunks, num_chunks)]
result = pool.map(worker, groups)
Run Code Online (Sandbox Code Playgroud)

使多处理池num_chunks一次在块上工作.

通过这样做,我们只需要足够的内存来在内存中保存一些(num_chunks)块,而不是整个文件.


import multiprocessing as mp
import itertools
import time
import csv

def worker(chunk):
    # `chunk` will be a list of CSV rows all with the same name column
    # replace this with your real computation
    # print(chunk)
    return len(chunk)  

def keyfunc(row):
    # `row` is one row of the CSV file.
    # replace this with the name column.
    return row[0]

def main():
    pool = mp.Pool()
    largefile = 'test.dat'
    num_chunks = 10
    results = []
    with open(largefile) as f:
        reader = csv.reader(f)
        chunks = itertools.groupby(reader, keyfunc)
        while True:
            # make a list of num_chunks chunks
            groups = [list(chunk) for key, chunk in
                      itertools.islice(chunks, num_chunks)]
            if groups:
                result = pool.map(worker, groups)
                results.extend(result)
            else:
                break
    pool.close()
    pool.join()
    print(results)

if __name__ == '__main__':
    main()
Run Code Online (Sandbox Code Playgroud)