处理来自多个进程的单个文件

pra*_*avk 67 python multithreading multiprocessing

我有一个大文本文件,我想处理每一行(做一些操作)并将它们存储在数据库中.由于单个简单程序花费的时间太长,我希望它可以通过多个进程或线程来完成.每个线程/进程应从该单个文件中读取不同的数据(不同的行),并对其数据(行)执行一些操作并将它们放入数据库中,以便最终处理完所有数据并进行处理.数据库与我需要的数据一起转储.

但我无法弄清楚如何处理这个问题.

jdi*_*jdi 92

您正在寻找的是生产者/消费者模式

基本线程示例

这是使用线程模块的基本示例(而不是多处理)

import threading
import Queue
import sys

def do_work(in_queue, out_queue):
    while True:
        item = in_queue.get()
        # process
        result = item
        out_queue.put(result)
        in_queue.task_done()

if __name__ == "__main__":
    work = Queue.Queue()
    results = Queue.Queue()
    total = 20

    # start for workers
    for i in xrange(4):
        t = threading.Thread(target=do_work, args=(work, results))
        t.daemon = True
        t.start()

    # produce data
    for i in xrange(total):
        work.put(i)

    work.join()

    # get the results
    for i in xrange(total):
        print results.get()

    sys.exit()
Run Code Online (Sandbox Code Playgroud)

您不会与线程共享文件对象.您可以通过向队列提供数据行来为它们生成工作.然后每个线程将获取一行,处理它,然后将其返回队列中.

多处理模块中内置了一些更高级的工具来共享数据,例如列表和特殊类型的Queue.使用多处理与线程之间需要权衡,这取决于您的工作是cpu绑定还是IO绑定.

基本的多处理.Pool示例

这是一个多处理池的基本示例

from multiprocessing import Pool

def process_line(line):
    return "FOO: %s" % line

if __name__ == "__main__":
    pool = Pool(4)
    with open('file.txt') as source_file:
        # chunk the work into batches of 4 lines at a time
        results = pool.map(process_line, source_file, 4)

    print results
Run Code Online (Sandbox Code Playgroud)

Pool是一个便利对象,可以管理自己的进程.由于打开的文件可以迭代它的行,你可以将它传递给它pool.map(),它将循环遍历它并将行传递给worker函数.映射块并在完成后返回整个结果.请注意,这是一个过于简化的示例,并且pool.map()会在完成工作之前将您的整个文件一次性读入内存.如果您希望有大文件,请记住这一点.有更先进的方法来设计生产者/消费者设置.

手动"池",带限制和行重新排序

这是Pool.map的一个手动示例,但是您可以设置一个队列大小,而不是一次性消耗整个迭代,这样您就可以尽可能快地将它一次性地提供给它.我还添加了行号,以便您可以跟踪它们并在以后根据需要引用它们.

from multiprocessing import Process, Manager
import time
import itertools 

def do_work(in_queue, out_list):
    while True:
        item = in_queue.get()
        line_no, line = item

        # exit signal 
        if line == None:
            return

        # fake work
        time.sleep(.5)
        result = (line_no, line)

        out_list.append(result)


if __name__ == "__main__":
    num_workers = 4

    manager = Manager()
    results = manager.list()
    work = manager.Queue(num_workers)

    # start for workers    
    pool = []
    for i in xrange(num_workers):
        p = Process(target=do_work, args=(work, results))
        p.start()
        pool.append(p)

    # produce data
    with open("source.txt") as f:
        iters = itertools.chain(f, (None,)*num_workers)
        for num_and_line in enumerate(iters):
            work.put(num_and_line)

    for p in pool:
        p.join()

    # get the results
    # example:  [(1, "foo"), (10, "bar"), (0, "start")]
    print sorted(results)
Run Code Online (Sandbox Code Playgroud)

  • @ jwillis0720-好的。`(None,)* num_workers`创建一个None值的元组,其值等于worker数量的大小。这些将是指示每个线程退出的哨兵值,因为没有更多工作了。使用itertools.chain函数,您可以将多个序列放到一个虚拟序列中,而不必复制任何内容。因此,我们得到的是,它首先循环遍历文件中的各行,然后遍历None值。 (2认同)
  • 这比我的教授更好解释,非常好+1. (2认同)

mgi*_*son 7

这是我制作的一个非常愚蠢的例子:

import os.path
import multiprocessing

def newlinebefore(f,n):
    f.seek(n)
    c=f.read(1)
    while c!='\n' and n > 0:
        n-=1
        f.seek(n)
        c=f.read(1)

    f.seek(n)
    return n

filename='gpdata.dat'  #your filename goes here.
fsize=os.path.getsize(filename) #size of file (in bytes)

#break the file into 20 chunks for processing.
nchunks=20
initial_chunks=range(1,fsize,fsize/nchunks)

#You could also do something like:
#initial_chunks=range(1,fsize,max_chunk_size_in_bytes) #this should work too.


with open(filename,'r') as f:
    start_byte=sorted(set([newlinebefore(f,i) for i in initial_chunks]))

end_byte=[i-1 for i in start_byte] [1:] + [None]

def process_piece(filename,start,end):
    with open(filename,'r') as f:
        f.seek(start+1)
        if(end is None):
            text=f.read()
        else: 
            nbytes=end-start+1
            text=f.read(nbytes)

    # process text here. createing some object to be returned
    # You could wrap text into a StringIO object if you want to be able to
    # read from it the way you would a file.

    returnobj=text
    return returnobj

def wrapper(args):
    return process_piece(*args)

filename_repeated=[filename]*len(start_byte)
args=zip(filename_repeated,start_byte,end_byte)

pool=multiprocessing.Pool(4)
result=pool.map(wrapper,args)

#Now take your results and write them to the database.
print "".join(result)  #I just print it to make sure I get my file back ...
Run Code Online (Sandbox Code Playgroud)

这里棘手的部分是确保我们将文件拆分为换行符,以便您不会错过任何行(或只读取部分行).然后,每个进程读取它的文件的一部分并返回一个对象,该对象可以由主线程放入数据库.当然,您甚至可能需要在块中执行此部分,这样您就不必一次将所有信息保存在内存中.(这很容易实现 - 只需将"args"列表拆分为X块并调用pool.map(wrapper,chunk) - 请参阅此处)