Python:处理大文件的最快方法

Rei*_*e45 28 python file filereader python-2.7

我有多个3GB制表符分隔文件.每个文件中有2000万行.所有行必须独立处理,任何两行之间没有关系.我的问题是,什么会更快A.使用逐行阅读

with open() as infile:
    for line in infile:
Run Code Online (Sandbox Code Playgroud)

或者B.以块的形式将文件读入内存并进行处理,一次说250 MB?

处理不是很复杂,我只是将column1中的值抓到List1,将column2抓到List2等.可能需要一起添加一些列值.

我在具有30GB内存的Linux机器上使用python 2.7.ASCII文本.

有什么方法可以加速并行?现在我正在使用前一种方法,而且过程非常缓慢.使用任何CSVReader模块都可以提供帮助吗?我不必在python中使用它,任何其他语言或数据库使用的想法都是受欢迎的.谢谢.

aba*_*ert 33

听起来你的代码是I/O绑定的.这意味着多处理不会有所帮助 - 如果您花费90%的时间从磁盘读取数据,那么在下次读取时等待额外的7个进程对任何事情都无济于事.

而且,虽然使用CSV读取模块(无论是stdlib csv还是NumPy或Pandas之类的东西)可能是一个简单的好主意,但它不太可能在性能上有太大差异.

不过,值得检查一下你是否真的 I/O限制,而不仅仅是猜测.运行程序,查看CPU使用率是接近0%还是接近100%或核心.做Amadan在评论中提出的建议,并运行你的程序只是pass为了处理,看看是否会减少5%的时间或70%.你甚至可能想尝试比较一个循环os.open和/ os.read(1024*1024)或某事,看看是否更快.


由于你使用Python 2.x,Python依靠C stdio库来猜测一次缓冲多少,因此可能值得强制缓冲更多.最简单的方法是使用readlines(bufsize)一些大的bufsize.(您可以尝试不同的数字并测量它们以查看峰值的位置.根据我的经验,通常64K-8MB的任何东西大致相同,但取决于您的系统可能会有所不同 - 特别是如果您是,例如,阅读关闭网络文件系统,吞吐量很大,但延迟时间很长,淹没了实际物理驱动器的吞吐量与延迟以及操作系统的缓存.)

所以,例如:

bufsize = 65536
with open(path) as infile: 
    while True:
        lines = infile.readlines(bufsize)
        if not lines:
            break
        for line in lines:
            process(line)
Run Code Online (Sandbox Code Playgroud)

同时,假设您使用的是64位系统,您可能希望尝试使用mmap而不是首先读取文件.这当然不能保证更好,但可能会更好,具体取决于您的系统.例如:

with open(path) as infile:
    m = mmap.mmap(infile, 0, access=mmap.ACCESS_READ)
Run Code Online (Sandbox Code Playgroud)

Python mmap是一种奇怪的对象 - 它同时像a str和a file一样,所以你可以,例如,手动迭代扫描换行,或者你可以调用readline它就好像它是一个文件.这两个将从Python中获取更多的处理,而不是将文件作为行或批处理进行迭代readlines(因为在C中的循环现在是纯Python ...虽然你可以用它来解决这个问题re,或者使用简单的Cython扩展?) ...但是操作系统的I/O优势知道你正在使用映射做什么可能会淹没CPU的劣势.

不幸的是,Python没有公开madvise你用来调整东西的调用,试图在C中优化它(例如,显式设置MADV_SEQUENTIAL而不是让内核猜测,或强制透明的大页面) - 但你实际上可以实现ctypes这个功能出来的libc.


Dee*_*ini 5

我知道这个问题很老;但我想做类似的事情,我创建了一个简单的框架,可以帮助您并行读取和处理大文件。留下我尝试过的作为答案。

这是代码,我最后举个例子

def chunkify_file(fname, size=1024*1024*1000, skiplines=-1):
    """
    function to divide a large text file into chunks each having size ~= size so that the chunks are line aligned

    Params : 
        fname : path to the file to be chunked
        size : size of each chink is ~> this
        skiplines : number of lines in the begining to skip, -1 means don't skip any lines
    Returns : 
        start and end position of chunks in Bytes
    """
    chunks = []
    fileEnd = os.path.getsize(fname)
    with open(fname, "rb") as f:
        if(skiplines > 0):
            for i in range(skiplines):
                f.readline()

        chunkEnd = f.tell()
        count = 0
        while True:
            chunkStart = chunkEnd
            f.seek(f.tell() + size, os.SEEK_SET)
            f.readline()  # make this chunk line aligned
            chunkEnd = f.tell()
            chunks.append((chunkStart, chunkEnd - chunkStart, fname))
            count+=1

            if chunkEnd > fileEnd:
                break
    return chunks

def parallel_apply_line_by_line_chunk(chunk_data):
    """
    function to apply a function to each line in a chunk

    Params :
        chunk_data : the data for this chunk 
    Returns :
        list of the non-None results for this chunk
    """
    chunk_start, chunk_size, file_path, func_apply = chunk_data[:4]
    func_args = chunk_data[4:]

    t1 = time.time()
    chunk_res = []
    with open(file_path, "rb") as f:
        f.seek(chunk_start)
        cont = f.read(chunk_size).decode(encoding='utf-8')
        lines = cont.splitlines()

        for i,line in enumerate(lines):
            ret = func_apply(line, *func_args)
            if(ret != None):
                chunk_res.append(ret)
    return chunk_res

def parallel_apply_line_by_line(input_file_path, chunk_size_factor, num_procs, skiplines, func_apply, func_args, fout=None):
    """
    function to apply a supplied function line by line in parallel

    Params :
        input_file_path : path to input file
        chunk_size_factor : size of 1 chunk in MB
        num_procs : number of parallel processes to spawn, max used is num of available cores - 1
        skiplines : number of top lines to skip while processing
        func_apply : a function which expects a line and outputs None for lines we don't want processed
        func_args : arguments to function func_apply
        fout : do we want to output the processed lines to a file
    Returns :
        list of the non-None results obtained be processing each line
    """
    num_parallel = min(num_procs, psutil.cpu_count()) - 1

    jobs = chunkify_file(input_file_path, 1024 * 1024 * chunk_size_factor, skiplines)

    jobs = [list(x) + [func_apply] + func_args for x in jobs]

    print("Starting the parallel pool for {} jobs ".format(len(jobs)))

    lines_counter = 0

    pool = mp.Pool(num_parallel, maxtasksperchild=1000)  # maxtaskperchild - if not supplied some weird happend and memory blows as the processes keep on lingering

    outputs = []
    for i in range(0, len(jobs), num_parallel):
        print("Chunk start = ", i)
        t1 = time.time()
        chunk_outputs = pool.map(parallel_apply_line_by_line_chunk, jobs[i : i + num_parallel])

        for i, subl in enumerate(chunk_outputs):
            for x in subl:
                if(fout != None):
                    print(x, file=fout)
                else:
                    outputs.append(x)
                lines_counter += 1
        del(chunk_outputs)
        gc.collect()
        print("All Done in time ", time.time() - t1)

    print("Total lines we have = {}".format(lines_counter))

    pool.close()
    pool.terminate()
    return outputs
Run Code Online (Sandbox Code Playgroud)

比如说,我有一个文件,我想在其中计算每行中的单词数,那么每行的处理看起来像

def count_words_line(line):
    return len(line.strip().split())
Run Code Online (Sandbox Code Playgroud)

然后调用函数,如:

parallel_apply_line_by_line(input_file_path, 100, 8, 0, count_words_line, [], fout=None)
Run Code Online (Sandbox Code Playgroud)

使用它,与在大小为 ~20GB 的样本文件上逐行读取相比,我的速度提高了 ~8 倍,其中我对每一行进行了一些中等复杂的处理。