use*_*625 16 python parallel-processing
我正在尝试使用多处理并行化应用程序,该应用程序接收一个非常大的csv文件(64MB到500MB),一些工作逐行,然后输出一个小的,固定大小的文件.
目前我做了一个list(file_obj)
,不幸的是,它被完全加载到内存中(我认为)然后我将该列表分成n个部分,n是我想要运行的进程数.然后我pool.map()
在破碎的清单上做了一个.
与单线程,只是打开文件和迭代的方法相比,这似乎有一个非常非常糟糕的运行时.有人可以建议更好的解决方案?
另外,我需要以组的形式处理文件的行,这些行保留了某个列的值.这些行组本身可以拆分,但任何组都不应包含此列的多个值.
unu*_*tbu 16
list(file_obj)
很大时可能需要大量内存fileobj
.我们可以通过使用itertools来根据需要提取行块来减少内存需求.
特别是我们可以使用
reader = csv.reader(f)
chunks = itertools.groupby(reader, keyfunc)
Run Code Online (Sandbox Code Playgroud)
将文件拆分为可处理的块,以及
groups = [list(chunk) for key, chunk in itertools.islice(chunks, num_chunks)]
result = pool.map(worker, groups)
Run Code Online (Sandbox Code Playgroud)
使多处理池num_chunks
一次在块上工作.
通过这样做,我们只需要足够的内存来在内存中保存一些(num_chunks
)块,而不是整个文件.
import multiprocessing as mp
import itertools
import time
import csv
def worker(chunk):
# `chunk` will be a list of CSV rows all with the same name column
# replace this with your real computation
# print(chunk)
return len(chunk)
def keyfunc(row):
# `row` is one row of the CSV file.
# replace this with the name column.
return row[0]
def main():
pool = mp.Pool()
largefile = 'test.dat'
num_chunks = 10
results = []
with open(largefile) as f:
reader = csv.reader(f)
chunks = itertools.groupby(reader, keyfunc)
while True:
# make a list of num_chunks chunks
groups = [list(chunk) for key, chunk in
itertools.islice(chunks, num_chunks)]
if groups:
result = pool.map(worker, groups)
results.extend(result)
else:
break
pool.close()
pool.join()
print(results)
if __name__ == '__main__':
main()
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
10724 次 |
最近记录: |