通过多处理应用减少发电机输出

sul*_*keh 5 python generator python-multiprocessing

我有一个像这样工作的生成器函数(Python)

def Mygenerator(x, y, z, ...):
    while True:
        # code that makes two matrices based on sequences of input arrays
        yield (matrix1, matrix2)
Run Code Online (Sandbox Code Playgroud)

我想要做的是添加此生成器的输出.这条线完成了这项工作:

M1, M2 = reduce(lambda x, y: x[0] + y[0], x[1] + y[1], Mygenerator(x, y, z, ...))
Run Code Online (Sandbox Code Playgroud)

我想将其并行化以加速计算.重要的是Mygenerator的输出会随着它的产生而减少,因为它list(Mygenerator(...))会占用太多内存.

sul*_*keh 3

为了回答我自己的问题,我找到了一个似乎按我希望的方式工作的解决方案:

首先,Mygenerator不再是一个生成器而是一个函数。另外,我现在不再循环遍历 x、y 和 z 的段,而是将一个段传递给函数:

def Myfunction(x_segment, y_segment, z_segment):
        # code that makes two matrices based on input arrays
        return (matrix1, matrix2)
Run Code Online (Sandbox Code Playgroud)

multiprocessing.Pool与(generator) 函数一起使用imap似乎有效:

pool = multiprocessing.Pool(ncpus)
results = pool.imap(Myfunction, 
                    ( (x[i], y[i], z[i]) for i in range(len(x)) )
M1, M2 = reduce(lambda r1, r2: (r1[0] + r2[0], r1[1] + r2[1]), 
                    (result for result in results))
pool.close()
pool.join()
Run Code Online (Sandbox Code Playgroud)

我将lambda 表达式中的xand更改为and以避免与其他同名变量混淆。当尝试使用生成器时, 我在 pickle 方面遇到了一些麻烦。yr1r2multiprocessing

这个解决方案唯一令人失望的是它并没有真正加快计算速度。我想这与开销操作有关。当使用8核时,处理速度提高了约10%。当减少到 4 个核心时,速度增加了一倍。这似乎是我对我的特定任务能做的最好的事情,除非有其他方法来进行并行化......

imap此处必须使用该函数,因为在操作map之前会将所有返回值存储在内存中reduce,但在这种情况下这是不可能的。