将CSV文件拆分成相等的部分?

Col*_*lin 2 python csv null byte split

我有一个大的CSV文件,我想分成一个等于系统中CPU核心数的数字.我想然后使用多进程让所有核心一起处理文件.但是,我甚至无法将文件拆分成部分.我看了整整一遍google,我发现了一些看似符合我想要的示例代码.这是我到目前为止:

def split(infilename, num_cpus=multiprocessing.cpu_count()):
    READ_BUFFER = 2**13
    total_file_size = os.path.getsize(infilename)
    print total_file_size
    files = list()
    with open(infilename, 'rb') as infile:
        for i in xrange(num_cpus):
            files.append(tempfile.TemporaryFile())
            this_file_size = 0
            while this_file_size < 1.0 * total_file_size / num_cpus:
                files[-1].write(infile.read(READ_BUFFER))
                this_file_size += READ_BUFFER
        files[-1].write(infile.readline()) # get the possible remainder
        files[-1].seek(0, 0)
    return files

files = split("sample_simple.csv")
print len(files)

for ifile in files:
    reader = csv.reader(ifile)
    for row in reader:
        print row
Run Code Online (Sandbox Code Playgroud)

这两个打印显示正确的文件大小,它被分成4个部分(我的系统有4个CPU内核).

但是,打印每个部分中所有行的代码的最后一部分给出了错误:

for row in reader:
_csv.Error: line contains NULL byte
Run Code Online (Sandbox Code Playgroud)

我尝试打印行而不运行split函数,它正确打印所有值.我怀疑split函数在生成的4个文件中添加了一些NULL字节,但我不确定原因.

有谁知道这是一个正确而快速的方法来拆分文件?我只想要csv.reader可以成功读取的结果片段.

mar*_*eau 5

正如我在评论中所说,csv文件需要在行(或行)边界上拆分.你的代码没有这样做,可能会在一个中间的地方将它们分解 - 我怀疑这是你的原因_csv.Error.

以下通过将输入文件作为一系列行处理来避免这样做.我已经对它进行了测试,它似乎独立工作,因为它将示例文件划分为大致相同大小的块,因为整行数不太可能完全适合一个块.

更新

这是一个基本的代码的速度更快的版本比我最初发布.改进是因为它现在使用临时文件自己的tell()方法来确定文件的不断变化的长度,因为它正在被写入而不是调用os.path.getsize(),这消除了flush()对文件的需要并os.fsync()在每行写入后调用它.

import csv
import multiprocessing
import os
import tempfile

def split(infilename, num_chunks=multiprocessing.cpu_count()):
    READ_BUFFER = 2**13
    in_file_size = os.path.getsize(infilename)
    print 'in_file_size:', in_file_size
    chunk_size = in_file_size // num_chunks
    print 'target chunk_size:', chunk_size
    files = []
    with open(infilename, 'rb', READ_BUFFER) as infile:
        for _ in xrange(num_chunks):
            temp_file = tempfile.TemporaryFile()
            while temp_file.tell() < chunk_size:
                try:
                    temp_file.write(infile.next())
                except StopIteration:  # end of infile
                    break
            temp_file.seek(0)  # rewind
            files.append(temp_file)
    return files

files = split("sample_simple.csv", num_chunks=4)
print 'number of files created: {}'.format(len(files))

for i, ifile in enumerate(files, start=1):
    print 'size of temp file {}: {}'.format(i, os.path.getsize(ifile.name))
    print 'contents of file {}:'.format(i)
    reader = csv.reader(ifile)
    for row in reader:
        print row
    print ''
Run Code Online (Sandbox Code Playgroud)

  • @Colin:文件的划分本质上是一个耗时的过程,因为它至少涉及读取和写入整个文件的数据.当我按照文档添加`os.fsync()`时,有一个明显的减速,即使它似乎在我的系统上没有它.如果可接受的大小相等的临时文件,您可以只比较每个其他或每三行的大小.另一种方法是从数学上精确的分割点开始,然后通过从该位置向前读取到最近的换行符来调整它们中的每一个. (2认同)