Python Joblib Parallel:如何合并每个工作人员的结果?

Mig*_*uel 8 python parallel-processing ram joblib

语境

我有一个函数可以生成一个大型二维numpy数组(具有固定形状)作为输出。joblib我在 8 个 CPU 上使用(Parallel带有后端)调用此函数 1000 次multiprocessing。在工作结束时,我将所有数组按元素相加(使用np.sum)以生成我感兴趣的单个二维数组。但是,当我尝试这样做时,我耗尽了 RAM。我认为这是因为 1000 个数组需要存储在 RAM 中,直到最后求和。

问题

有没有办法让每个工作人员在运行时将其数组相加?例如,worker 1 会将数组 2 添加到数组 1,然后在计算数组 3 之前丢弃数组 2,依此类推。这样,在任何时间点,RAM 中最多只能存储 8 个数组(对于 8 个 CPU),并且最后可以对这些数组进行求和以获得相同的答案。

Dar*_*aut 5

事实上,您提前知道您的论点,并且计算时间与实际论点变化不大,这简化了任务。它允许在开始时为每个工作进程分配完整的作业,并在最后总结结果,就像您建议的那样。

在下面的代码中,每个生成的进程都会获得所有参数(其)的“相等”(尽可能多)部分args_batch,并在其自己的结果数组中总结调用目标函数的中间结果。这些数组最终由父进程求和。

示例中的“延迟”函数不是计算数组的目标函数,而是worker目标函数 () 与一批参数一起作为参数的calc_array一部分传递到的处理函数 ()。job

import numpy as np
from itertools import repeat
from time import sleep
from joblib import Parallel, delayed


def calc_array(v):
    """Create an array with specified shape and
    fill it up with value v, then kill some time.

    Dummy target function.
    """
    new_array = np.full(shape=SHAPE, fill_value=v)
    # delay result:
    cnt = 10_000_000
    for _ in range(cnt):
        cnt -= 1

    return new_array


def worker(func, args_batch):
    """Call func with every packet of arguments received and update
    result array on the run.

    Worker function which runs the job in each spawned process.
    """
    results = np.zeros(SHAPE)
    for args_ in args_batch:
        new_array = func(*args_)
        np.sum([results, new_array], axis=0, out=results)

    return results


def main(func, arguments, n_jobs, verbose):

    with Parallel(n_jobs=n_jobs, verbose=verbose) as parallel:

        # bundle up jobs:
        funcs = repeat(func, n_jobs)  # functools.partial seems not pickle-able
        args_batches = np.array_split(arguments, n_jobs, axis=0)
        jobs = zip(funcs, args_batches)

        result = sum(parallel(delayed(worker)(*job) for job in jobs))
        assert np.all(result == sum(range(CALLS_TOTAL)))

    sleep(1)  # just to keep stdout ordered
    print(result)


if __name__ == '__main__':

    SHAPE = (4, 4)  # shape of array calculated by calc_array
    N_JOBS = 8
    CALLS_TOTAL = 100
    VERBOSE = 10
    ARGUMENTS = np.asarray([*zip(range(CALLS_TOTAL))])
    # array([[0], [1], [2], ...]])
    # zip to bundle arguments in a container so we have less code to
    # adapt when feeding a function with multiple parameters

    main(func=calc_array, arguments=ARGUMENTS, n_jobs=N_JOBS, verbose=VERBOSE)
Run Code Online (Sandbox Code Playgroud)