Mig*_*uel 8 python parallel-processing ram joblib
语境
我有一个函数可以生成一个大型二维numpy数组(具有固定形状)作为输出。joblib我在 8 个 CPU 上使用(Parallel带有后端)调用此函数 1000 次multiprocessing。在工作结束时,我将所有数组按元素相加(使用np.sum)以生成我感兴趣的单个二维数组。但是,当我尝试这样做时,我耗尽了 RAM。我认为这是因为 1000 个数组需要存储在 RAM 中,直到最后求和。
问题
有没有办法让每个工作人员在运行时将其数组相加?例如,worker 1 会将数组 2 添加到数组 1,然后在计算数组 3 之前丢弃数组 2,依此类推。这样,在任何时间点,RAM 中最多只能存储 8 个数组(对于 8 个 CPU),并且最后可以对这些数组进行求和以获得相同的答案。
事实上,您提前知道您的论点,并且计算时间与实际论点变化不大,这简化了任务。它允许在开始时为每个工作进程分配完整的作业,并在最后总结结果,就像您建议的那样。
在下面的代码中,每个生成的进程都会获得所有参数(其)的“相等”(尽可能多)部分args_batch,并在其自己的结果数组中总结调用目标函数的中间结果。这些数组最终由父进程求和。
示例中的“延迟”函数不是计算数组的目标函数,而是worker目标函数 () 与一批参数一起作为参数的calc_array一部分传递到的处理函数 ()。job
import numpy as np
from itertools import repeat
from time import sleep
from joblib import Parallel, delayed
def calc_array(v):
"""Create an array with specified shape and
fill it up with value v, then kill some time.
Dummy target function.
"""
new_array = np.full(shape=SHAPE, fill_value=v)
# delay result:
cnt = 10_000_000
for _ in range(cnt):
cnt -= 1
return new_array
def worker(func, args_batch):
"""Call func with every packet of arguments received and update
result array on the run.
Worker function which runs the job in each spawned process.
"""
results = np.zeros(SHAPE)
for args_ in args_batch:
new_array = func(*args_)
np.sum([results, new_array], axis=0, out=results)
return results
def main(func, arguments, n_jobs, verbose):
with Parallel(n_jobs=n_jobs, verbose=verbose) as parallel:
# bundle up jobs:
funcs = repeat(func, n_jobs) # functools.partial seems not pickle-able
args_batches = np.array_split(arguments, n_jobs, axis=0)
jobs = zip(funcs, args_batches)
result = sum(parallel(delayed(worker)(*job) for job in jobs))
assert np.all(result == sum(range(CALLS_TOTAL)))
sleep(1) # just to keep stdout ordered
print(result)
if __name__ == '__main__':
SHAPE = (4, 4) # shape of array calculated by calc_array
N_JOBS = 8
CALLS_TOTAL = 100
VERBOSE = 10
ARGUMENTS = np.asarray([*zip(range(CALLS_TOTAL))])
# array([[0], [1], [2], ...]])
# zip to bundle arguments in a container so we have less code to
# adapt when feeding a function with multiple parameters
main(func=calc_array, arguments=ARGUMENTS, n_jobs=N_JOBS, verbose=VERBOSE)
Run Code Online (Sandbox Code Playgroud)