11 python csv numpy multiprocessing python-multiprocessing
我正在使用以下代码将CSV文件拆分为多个块(源自此处)
def worker(chunk):
print len(chunk)
def keyfunc(row):
return row[0]
def main():
pool = mp.Pool()
largefile = 'Counseling.csv'
num_chunks = 10
start_time = time.time()
results = []
with open(largefile) as f:
reader = csv.reader(f)
reader.next()
chunks = itertools.groupby(reader, keyfunc)
while True:
# make a list of num_chunks chunks
groups = [list(chunk) for key, chunk in
itertools.islice(chunks, num_chunks)]
if groups:
result = pool.map(worker, groups)
results.extend(result)
else:
break
pool.close()
pool.join()
Run Code Online (Sandbox Code Playgroud)
但是,无论我选择使用多少个块,似乎块的数量始终保持不变.例如,无论我选择有1个还是10个块,我总是在处理样本文件时获得此输出.理想情况下,我想将文件分块以便公平分配.
请注意,我正在分块的真实文件超过1300万行,这就是为什么我要一块一块地处理它.这是必须的!
6
7
1
...
1
1
94
--- 0.101687192917 seconds ---
Run Code Online (Sandbox Code Playgroud)
unu*_*tbu 12
根据评论,我们希望每个进程都能在10000行上运行.这不是很难做到的; 看iter/islice
下面的食谱.但是,使用的问题
pool.map(worker, ten_thousand_row_chunks)
Run Code Online (Sandbox Code Playgroud)
是否pool.map
会尝试将所有块一次性放入任务队列中.如果这需要比可用内存更多的内存,那么你得到一个
MemoryError
.(注意:pool.imap
遇到同样的问题.)
所以相反,我们需要pool.map
迭代地调用每个块的片段.
import itertools as IT
import multiprocessing as mp
import csv
def worker(chunk):
return len(chunk)
def main():
# num_procs is the number of workers in the pool
num_procs = mp.cpu_count()
# chunksize is the number of lines in a chunk
chunksize = 10**5
pool = mp.Pool(num_procs)
largefile = 'Counseling.csv'
results = []
with open(largefile, 'rb') as f:
reader = csv.reader(f)
for chunk in iter(lambda: list(IT.islice(reader, chunksize*num_procs)), []):
chunk = iter(chunk)
pieces = list(iter(lambda: list(IT.islice(chunk, chunksize)), []))
result = pool.map(worker, pieces)
results.extend(result)
print(results)
pool.close()
pool.join()
main()
Run Code Online (Sandbox Code Playgroud)
每个chunk
都包含chunksize*num_procs
文件中的行.这是足够的数据,可以为池中的所有工作人员提供一些工作,但不会太大而导致MemoryError - 提供的设置chunksize
不是太大.
chunk
然后将每个碎片分成碎片,每个碎片由chunksize
文件中的最多行组成
.然后将这些作品发送给pool.map
.
如何iter(lambda: list(IT.islice(iterator, chunksize)), [])
工作:
这是将迭代器分组为长度为chunksize的块的习惯用法.让我们看看它如何在一个例子中起作用:
In [111]: iterator = iter(range(10))
Run Code Online (Sandbox Code Playgroud)
请注意,每次IT.islice(iterator, 3)
调用时,都会从迭代器中切掉3个项目的新块:
In [112]: list(IT.islice(iterator, 3))
Out[112]: [0, 1, 2]
In [113]: list(IT.islice(iterator, 3))
Out[113]: [3, 4, 5]
In [114]: list(IT.islice(iterator, 3))
Out[114]: [6, 7, 8]
Run Code Online (Sandbox Code Playgroud)
当迭代器中剩余的项目少于3个时,只返回剩余的项目:
In [115]: list(IT.islice(iterator, 3))
Out[115]: [9]
Run Code Online (Sandbox Code Playgroud)
如果再次调用它,则会得到一个空列表:
In [116]: list(IT.islice(iterable, 3))
Out[116]: []
Run Code Online (Sandbox Code Playgroud)
lambda: list(IT.islice(iterator, chunksize))
是一个list(IT.islice(iterator, chunksize))
在被调用时返回的函数.这是一个"单行",相当于
def func():
return list(IT.islice(iterator, chunksize))
Run Code Online (Sandbox Code Playgroud)
最后,iter(callable, sentinel)
返回另一个迭代器.此迭代器产生的值是callable返回的值.它继续产生值,直到callable返回一个等于sentinel的值.所以
iter(lambda: list(IT.islice(iterator, chunksize)), [])
Run Code Online (Sandbox Code Playgroud)
将继续返回值,list(IT.islice(iterator, chunksize))
直到该值为空列表:
In [121]: iterator = iter(range(10))
In [122]: list(iter(lambda: list(IT.islice(iterator, 3)), []))
Out[122]: [[0, 1, 2], [3, 4, 5], [6, 7, 8], [9]]
Run Code Online (Sandbox Code Playgroud)
首先,如果记录尚未在关键列上排序,则itertools.groupby将没有任何实际意义.此外,如果您的要求只是将csv文件块化为预定数量的行并将其提供给工作者,那么您不必执行所有这些操作.
一个简单的实现将是:
import csv
from multiprocessing import Pool
def worker(chunk):
print len(chunk)
def emit_chunks(chunk_size, file_path):
lines_count = 0
with open(file_path) as f:
reader = csv.reader(f)
chunk = []
for line in reader:
lines_count += 1
chunk.append(line)
if lines_count == chunk_size:
lines_count = 0
yield chunk
chunk = []
else:
continue
if chunk : yield chunk
def main():
chunk_size = 10
gen = emit_chunks(chunk_size, 'c:/Temp/in.csv')
p = Pool(5)
p.imap(worker, gen)
print 'Completed..'
Run Code Online (Sandbox Code Playgroud)
*编辑:更改为pool.imap而不是pool.map