Python Chunking CSV文件多处理

11 python csv numpy multiprocessing python-multiprocessing

我正在使用以下代码将CSV文件拆分为多个块(源自此处)

def worker(chunk):
    print len(chunk)

def keyfunc(row):
    return row[0]

def main():
    pool = mp.Pool()
    largefile = 'Counseling.csv'
    num_chunks = 10
    start_time = time.time()
    results = []
    with open(largefile) as f:
        reader = csv.reader(f)
        reader.next()
        chunks = itertools.groupby(reader, keyfunc)
        while True:
            # make a list of num_chunks chunks
            groups = [list(chunk) for key, chunk in
                      itertools.islice(chunks, num_chunks)]
            if groups:
                result = pool.map(worker, groups)
                results.extend(result)
            else:
                break
    pool.close()
    pool.join()
Run Code Online (Sandbox Code Playgroud)

但是,无论我选择使用多少个块,似乎块的数量始终保持不变.例如,无论我选择有1个还是10个块,我总是在处理样本文件时获得此输出.理想情况下,我想将文件分块以便公平分配.

请注意,我正在分块的真实文件超过1300万行,这就是为什么我要一块一块地处理它.这是必须的!

6
7
1
...
1
1
94
--- 0.101687192917 seconds ---
Run Code Online (Sandbox Code Playgroud)

unu*_*tbu 12

根据评论,我们希望每个进程都能在10000行上运行.这不是很难做到的; 看iter/islice下面的食谱.但是,使用的问题

pool.map(worker, ten_thousand_row_chunks)
Run Code Online (Sandbox Code Playgroud)

是否pool.map会尝试将所有块一次性放入任务队列中.如果这需要比可用内存更多的内存,那么你得到一个 MemoryError.(注意:pool.imap 遇到同样的问题.)

所以相反,我们需要pool.map迭代地调用每个块的片段.

import itertools as IT
import multiprocessing as mp
import csv

def worker(chunk):
    return len(chunk)

def main():
    # num_procs is the number of workers in the pool
    num_procs = mp.cpu_count()
    # chunksize is the number of lines in a chunk
    chunksize = 10**5

    pool = mp.Pool(num_procs)
    largefile = 'Counseling.csv'
    results = []
    with open(largefile, 'rb') as f:
        reader = csv.reader(f)
        for chunk in iter(lambda: list(IT.islice(reader, chunksize*num_procs)), []):
            chunk = iter(chunk)
            pieces = list(iter(lambda: list(IT.islice(chunk, chunksize)), []))
            result = pool.map(worker, pieces)
            results.extend(result)
    print(results)
    pool.close()
    pool.join()

main()
Run Code Online (Sandbox Code Playgroud)

每个chunk都包含chunksize*num_procs文件中的行.这是足够的数据,可以为池中的所有工作人员提供一些工作,但不会太大而导致MemoryError - 提供的设置chunksize不是太大.

chunk然后将每个碎片分成碎片,每个碎片由chunksize文件中的最多行组成 .然后将这些作品发送给pool.map.


如何iter(lambda: list(IT.islice(iterator, chunksize)), [])工作:

这是将迭代器分组为长度为chunksize的块的习惯用法.让我们看看它如何在一个例子中起作用:

In [111]: iterator = iter(range(10))
Run Code Online (Sandbox Code Playgroud)

请注意,每次IT.islice(iterator, 3)调用时,都会从迭代器中切掉3个项目的新块:

In [112]: list(IT.islice(iterator, 3))
Out[112]: [0, 1, 2]

In [113]: list(IT.islice(iterator, 3))
Out[113]: [3, 4, 5]

In [114]: list(IT.islice(iterator, 3))
Out[114]: [6, 7, 8]
Run Code Online (Sandbox Code Playgroud)

当迭代器中剩余的项目少于3个时,只返回剩余的项目:

In [115]: list(IT.islice(iterator, 3))
Out[115]: [9]
Run Code Online (Sandbox Code Playgroud)

如果再次调用它,则会得到一个空列表:

In [116]: list(IT.islice(iterable, 3))
Out[116]: []
Run Code Online (Sandbox Code Playgroud)

lambda: list(IT.islice(iterator, chunksize))是一个list(IT.islice(iterator, chunksize))在被调用时返回的函数.这是一个"单行",相当于

def func():
    return  list(IT.islice(iterator, chunksize))
Run Code Online (Sandbox Code Playgroud)

最后,iter(callable, sentinel)返回另一个迭代器.此迭代器产生的值是callable返回的值.它继续产生值,直到callable返回一个等于sentinel的值.所以

iter(lambda: list(IT.islice(iterator, chunksize)), [])
Run Code Online (Sandbox Code Playgroud)

将继续返回值,list(IT.islice(iterator, chunksize))直到该值为空列表:

In [121]: iterator = iter(range(10))

In [122]: list(iter(lambda: list(IT.islice(iterator, 3)), []))
Out[122]: [[0, 1, 2], [3, 4, 5], [6, 7, 8], [9]]
Run Code Online (Sandbox Code Playgroud)


gip*_*psy 6

首先,如果记录尚未在关键列上排序,则itertools.groupby将没有任何实际意义.此外,如果您的要求只是将csv文件块化为预定数量的行并将其提供给工作者,那么您不必执行所有这些操作.

一个简单的实现将是:

import csv
from multiprocessing import Pool


def worker(chunk):
    print len(chunk)

def emit_chunks(chunk_size, file_path):
    lines_count = 0
    with open(file_path) as f:
        reader = csv.reader(f)
        chunk = []
        for line in reader:
            lines_count += 1
            chunk.append(line)
            if lines_count == chunk_size:
                lines_count = 0
                yield chunk
                chunk = []
            else:
                continue
        if chunk : yield chunk

def main():
    chunk_size = 10
    gen = emit_chunks(chunk_size, 'c:/Temp/in.csv')
    p = Pool(5)
    p.imap(worker, gen)
    print 'Completed..'
Run Code Online (Sandbox Code Playgroud)

*编辑:更改为pool.imap而不是pool.map