如何利用Python中的线程解析大文件?

cul*_*rón 2 python parallel-processing multithreading large-files

我有一个巨大的文件,需要阅读和处理.

with open(source_filename) as source, open(target_filename) as target:
    for line in source:
        target.write(do_something(line))

    do_something_else()
Run Code Online (Sandbox Code Playgroud)

这可以通过线程加速吗?如果我每行产生一个线程,这会产生巨大的开销吗?

编辑:为了使这个问题不是讨论,代码应该如何?

with open(source_filename) as source, open(target_filename) as target:
   ?
Run Code Online (Sandbox Code Playgroud)

@Nicoretti:在迭代中,我需要读取一行数KB的数据.

更新2:文件可能是bz2,因此Python可能必须等待解压缩:

$ bzip2 -d country.osm.bz2 | ./my_script.py
Run Code Online (Sandbox Code Playgroud)

Jan*_*ila 7

您可以使用三个线程:用于读取,处理和写入.可能的优点是可以在等待I/O时进行处理,但您需要自己考虑一些时间,以确定在您的情况下是否有实际的好处.

import threading
import Queue

QUEUE_SIZE = 1000
sentinel = object()

def read_file(name, queue):
    with open(name) as f:
        for line in f:
            queue.put(line)
    queue.put(sentinel)

def process(inqueue, outqueue):
    for line in iter(inqueue.get, sentinel):
        outqueue.put(do_something(line))
    outqueue.put(sentinel)

def write_file(name, queue):
    with open(name, "w") as f:
        for line in iter(queue.get, sentinel):
            f.write(line)

inq = Queue.Queue(maxsize=QUEUE_SIZE)
outq = Queue.Queue(maxsize=QUEUE_SIZE)

threading.Thread(target=read_file, args=(source_filename, inq)).start()
threading.Thread(target=process, args=(inq, outq)).start()
write_file(target_filename, outq)
Run Code Online (Sandbox Code Playgroud)

maxsize为队列设置一个以防止不断增加的内存消耗是个好主意.对我而言,1000的值是任意选择.

  • @Satarangi_Re 是的,限制队列的大小是个好主意。相应地编辑了答案。 (2认同)