在 Python 中并行从磁盘读取文件

Cro*_*opy 7 python parallel-processing for-loop

我正在从 MATLAB 迁移到 Python,主要是因为 Python 中有大量有趣的机器学习包可用。但让我感到困惑的问题之一是并行处理。特别是,我想从磁盘中for循环读取数千个文本文件,并且我想并行执行。在 MATLAB 中,使用parfor而不是for可以解决问题,但到目前为止我还没有弄清楚如何在 python 中做到这一点。这是我想要做的一个例子。我想读取 N 个文本文件,将它们组成一个 N1xN2 数组,并将每个文件保存到一个 NxN1xN2 numpy 数组中。这个数组将是我从函数返回的内容。假设文件名是file0001.datfile0002.dat等,我喜欢并行化的代码如下:

import numpy as np
N=10000
N1=200
N2=100
result = np.empty([N, N1, N2])
for counter in range(N):
    t_str="%.4d" % counter        
    filename = 'file_'+t_str+'.dat'
    temp_array = np.loadtxt(filename)
    temp_array.shape=[N1,N2]
    result[counter,:,:]=temp_array
Run Code Online (Sandbox Code Playgroud)

我在集群上运行代码,所以我可以使用许多处理器来完成这项工作。因此,任何关于哪种并行化方法更适合我的任务(如果有多个)的评论都是最受欢迎的。

注意:我知道这篇文章,但在那篇文章中,只有out1, out2,out3变量需要担心,并且它们已被明确用作要并行化的函数的参数。但是在这里,我有许多 2D 数组应该从文件中读取并保存到 3D 数组中。所以,这个问题的答案对我的情况来说不够通用(或者我是这样理解的)。

The*_*Cat 5

你可能仍然想使用多处理,只是结构有点不同:

from multiprocessing import Pool

import numpy as np

N=10000
N1=200
N2=100
result = np.empty([N, N1, N2])

filenames = ('file_%.4d.dat' % i for i in range(N))
myshaper = lambda fname: np.loadtxt(fname).reshape([N1, nN2])

pool = Pool()    
for i, temparray in enumerate(pool.imap(myshaper, filenames)):
    result[i, :, :] = temp_array
pool.close()
pool.join()
Run Code Online (Sandbox Code Playgroud)

这样做的目的是首先为filenames. 这意味着文件名不存储在内存中,但您仍然可以遍历它们。接下来,它创建一个 lambda 函数(相当于 matlab 中的匿名函数),用于加载和重塑文件(您也可以使用普通函数)。然后它将该函数应用于使用多个进程的每个文件名,并将结果放入整个数组中。然后它关闭进程。

这个版本使用了一些更惯用的python。然而,一种更类似于你原来的方法(虽然不那么惯用)可能会帮助你更好地理解:

from multiprocessing import Pool

import numpy as np

N=10000
N1=200
N2=100
result = np.empty([N, N1, N2])

def proccounter(counter):
    t_str="%.4d" % counter        
    filename = 'file_'+t_str+'.dat'
    temp_array = np.loadtxt(filename)
    temp_array.shape=[N1,N2]
    return counter, temp_array

pool = Pool()
for counter, temp_array in pool.imap(proccounter, range(N)):
    result[counter,:,:] = temp_array
pool.close()
pool.join()
Run Code Online (Sandbox Code Playgroud)

这只是将大部分for循环拆分为一个函数,使用多个处理器将该函数应用于范围的每个元素,然后将结果放入数组中。它基本上只是您的原始函数,for循环分为两个for循环。


Cro*_*opy 4

可以使用joblib库来完成,如下所示:

def par_func(N1, N2, counter):
    import numpy as np
    t_str="%.4d" % counter   
    filename = 'file_'+t_str+'.dat'
    temp_array = np.loadtxt(filename)
    temp_array.shape=[N1,N2]
    # temp_array = np.random.randn(N1, N2)  # use this line to test
    return temp_array

if __name__ == '__main__':
    import numpy as np

    N=1000
    N1=200
    N2=100

    from joblib import Parallel, delayed
    num_jobs = 2
    output_list = Parallel(n_jobs=num_jobs)(delayed(par_func) 
                                            (N1, N2, counter)
                                            for counter in range(N)) 

    output_array = np.array(output_list)
Run Code Online (Sandbox Code Playgroud)