并行执行和python上的文件写入

DOS*_*ter 3 python parallel-processing multithreading multiprocessing

我有一个非常大的数据集分布在10个大集群中,任务是为每个集群做一些计算,并逐行将结果写入(追加)到10个文件中,其中每个文件包含对应于10个集群中的每个集群的结果,每个集群可以独立计算,我想将代码并行化为十个CPU(或线程),这样我就可以一次对所有集群进行计算,我的任务的简化伪代码如下:

for(c in range (1,10)):  #this is the loop over the clusters
    for(l in "readlines from cluster C")
         # do some computations for line l in cluster c
         # append the results in file named "cluster_c" one file for each cluter c
Run Code Online (Sandbox Code Playgroud)

Dav*_*dmh 8

您可以使用joblib来并行化分析.如果你有一个功能process_line:

from joblib import Parallel, delayed
data = Parallel(n_jobs=-1)(delayed(process_line)(line)
                           for line in open('bigfile'))
Run Code Online (Sandbox Code Playgroud)

您想要按顺序保存信息.根据要保存的数据的计算时间/大小的比率,您可以使用不同的方法:

获得一些数字需要大量的计算时间

线程之间通信的开销非常小.最简单的选择是每个进程在一个独立的文件上写入,然后将结果汇总到最后.您可以通过传递索引并使用它来创建文件来确保不会覆盖.

更高级的解决方案是将文件处理程序作为参数传递,并在获取multiprocessing.Lock后才写入文件.唯一的问题是,如果许多进程试图同时获取锁,它们将占用CPU资源而不是计算.

def process_line(line, outfile, lock)
   data = line[0]
   lock.aquire()
   print >> outfile, data
   lock.release()
Run Code Online (Sandbox Code Playgroud)

缩短计算时间

如果您有更多数据,写入文件可能会产生一些开销,特别是如果您打算在内存中重新加载它.这里有两个选择:

  • 所有数据都适合记忆:你很幸运.使用joblib,只需将其作为函数的返回.最后,您有一个列表,其中包含您的所有结果.
  • 数据不适合内存,您必须动态使用它.您需要一个消费者 - 生产者模式.就像是:

    from multiprocessing import Process, JoinableQueue
    from joblib import Parallel, delayed
    
    def saver(q):
        with open('out.txt', 'w') as out:
            while True:
                val = q.get()
                if val is None: break
                print >> out, val
                q.task_done()
            # Finish up
            q.task_done()
    
    def foo(x):
        q.put(x**3+2)
    
    q = JoinableQueue()
    p = Process(target=saver, args=(q,))
    p.start()
    Parallel(n_jobs=2, verbose=0)(delayed(foo)(i) for i in xrange(1000))
    q.put(None) # Poison pill
    q.join()
    p.join()
    
    Run Code Online (Sandbox Code Playgroud)

如果数据量与计算时间相比非常大,您会发现将数据从一个进程传输到其他进程的开销很大.如果这是你的限制,那么你应该使用更高级的技术,比如OpenMP,或者使用Cython来摆脱GIL,并使用线程而不是进程.

请注意,我没有说明"小"是多么小; 因为这在很大程度上取决于您的群集的配置.通信,底层文件系统等的速度有多快; 但是没有什么是你不能相当容易地试验的,例如,计算虚拟程序将一行发送到另一个进程所需的时间.


aru*_*aku 5

就像@Davidmh的答案一样,但在python3中工作:

from multiprocessing import Process, JoinableQueue
from joblib import Parallel, delayed

def saver(q):
    with open('out.txt', 'w') as out:
        while True:
            val = q.get()
            if val is None: break
            out.write(val + '\n')
        q.task_done()
        # Finish up
        q.task_done()

def foo(x):
    import os
    q.put(str(os.getpid()) + '-' + str(x**3+2))

q = JoinableQueue()
p = Process(target=saver, args=(q,))
p.start()
Parallel(n_jobs=-1, verbose=0)(delayed(foo)(i) for i in range(1000))
q.put(None) # Poison pill
p.join()
Run Code Online (Sandbox Code Playgroud)

PS:我还在每个输出行中添加了 PID,以检查一切是否按预期工作 ;-)


jfs*_*jfs 3

#!/usr/bin/env python
from multiprocessing import Pool

def compute_cluster(c):
    """each cluster can be computed independently"""
    ... # compute a cluster here 

if __name__=="__main__":
   pool = Pool(10) # run 10 task at most in parallel
   pool.map(compute_cluster, range(10))
Run Code Online (Sandbox Code Playgroud)