使用 joblib 和 SLURM 在 Python 中并行化 for 循环

plu*_*mms 1 python parallel-processing for-loop slurm joblib

我有一个包含 100 个元组的列表tuplelist,用作外部函数的输入。外部函数返回一个值,并且该值被附加到一个数组中,如下所示 ( MainFile.py):

from ExternalPythonFile import ExternalFunction

valuelist = []
for a,b in tuplelist:
    value = ExternalFunction(a,b)
    # more functions here
    valuelist.append(value)
print(len(valuelist))
Run Code Online (Sandbox Code Playgroud)

print(len(valuelist))使用上面的 for 循环时的输出是(100,)

现在,由于元组的顺序以及它们的附加方式在我的情况下并不重要,因此我想并行化 for 循环,因为处理 100 个元组需要大约 10 分钟,并且我希望扩展该数字。我尝试了下面的 joblib 实现(MainFileJoblib.py):

from ExternalPythonFile import ExternalFunction
from joblib import Parallel, delayed, parallel_backend
import multiprocessing

valuelist = []

def TupleFunction(a,b):
        value = ExternalFunction(a,b)
        # more functions here
        valuelist.append(value)

with parallel_backend('multiprocessing'):
    Parallel(n_jobs=10)(delayed(TupleFunction)(a,b) for a,b in tuplelist)

print(len(valuelist))
Run Code Online (Sandbox Code Playgroud)

我在 unix 计算集群上运行所有这些,但运行时间仍然相似,约为 8 分钟。输出也是错误的,打印了(0,)

我发现htop实际上有 10 个核心正在使用,但每个核心仅使用 20%。

我还尝试通过 SLURM 运行 joblib 实现:

srun --ntasks=1 --ncpus-per-task=10 python3 MainFileJoblib.py
Run Code Online (Sandbox Code Playgroud)

这绝对更快,大约 2 分钟左右,但它再次给出了错误的结果(0,)

并行化原始 for 循环的最佳方法是什么?

mad*_*man 5

Joblib 自行管理输出列表的创建和填充,因此可以通过以下方式轻松修复代码:

from ExternalPythonFile import ExternalFunction
from joblib import Parallel, delayed, parallel_backend
import multiprocessing


with parallel_backend('multiprocessing'):
    valuelist = Parallel(n_jobs=10)(delayed(ExternalFunction)(a, b) for a, b in tuplelist)

print(len(valuelist))
Run Code Online (Sandbox Code Playgroud)

如果由于某种原因您需要更新类似数组的对象,您可以使用 numpy memmap,如以下最小示例所示:

import tempfile
import numpy as np
from ExternalPythonFile import ExternalFunction
from joblib import Parallel, delayed, parallel_backend
import multiprocessing


# define function to update your array
def fill_array(mm_file, i, tuple_val):
    a, b = tuple_val
    value = ExternalFunction(a, b)
    # more functions here
    mm_file[i] = value

# create a temporary folder
tmp_dir = tempfile.mkdtemp()
# create a file where to dump your array
values_fname_memmap = Path(tmp_dir).joinpath("values_memmap")
values_memmap = np.memmap(values_fname_memmap.as_posix(),
                          dtype=np.float,
                          shape=(len(tuplelist), ),
                          mode='w+')

with parallel_backend('multiprocessing'):
    Parallel(n_jobs=10)(delayed(fill_array)(values_memmap, i, ab) 
                        for i, ab in enumerate(tuplelist))

print(len(values_memmap))
Run Code Online (Sandbox Code Playgroud)

如果您需要对值应用一组转换(#morefunctions),只需在ExternalFunction周围创建一个包装器,它输出给定元组(a,b)所需的值。

我希望尽管回复较晚,但它仍然对您有用。