如何写入python joblib中的共享变量

use*_*057 17 python parallel-processing shared-memory joblib

以下代码并行化for循环.

import networkx as nx;
import numpy as np;
from joblib import Parallel, delayed;
import multiprocessing;

def core_func(repeat_index, G, numpy_arrary_2D):
  for u in G.nodes():
    numpy_arrary_2D[repeat_index][u] = 2;
  return;

if __name__ == "__main__":
  G = nx.erdos_renyi_graph(100000,0.99);
  nRepeat = 5000;
  numpy_array = np.zeros([nRepeat,G.number_of_nodes()]);
  Parallel(n_jobs=4)(delayed(core_func)(repeat_index, G, numpy_array) for repeat_index in range(nRepeat));
  print(np.mean(numpy_array));
Run Code Online (Sandbox Code Playgroud)

可以看出,要打印的期望值是2.但是,当我在集群(多核,共享内存)上运行我的代码时,它返回0.0.

我认为问题是每个工作者都创建自己的numpy_array对象副本,并且不更新在main函数中创建的副本.如何修改代码numpy_array以便更新numpy数组?

Ser*_*yev 8

joblib默认情况下使用多处理进程池,如其手册所示:

在引擎盖下,Parallel对象创建一个多处理池,在多个进程中分叉Python解释器来执行列表中的每个项目.延迟函数是一个简单的技巧,可以使用函数调用语法创建元组(函数,args,kwargs).

这意味着,每个进程都会继承数组的原始状态,但无论它在内部写入什么状态,都会在进程退出时丢失.只有函数结果被传递回调用(主)进程.但是你没有返回任何东西,所以None返回.

要使共享数组具有可编辑性,您有两种方法:使用线程和使用共享内存.


与进程不同,线程共享内存.所以你可以写入数组,每个工作都会看到这个变化.根据joblib手册,它是这样做的:

  Parallel(n_jobs=4, backend="threading")(delayed(core_func)(repeat_index, G, numpy_array) for repeat_index in range(nRepeat));
Run Code Online (Sandbox Code Playgroud)

当你运行它:

$ python r1.py 
2.0
Run Code Online (Sandbox Code Playgroud)

然而,当你会写复杂的事情到阵列中,请务必妥善处理好周围的数据或数据块的锁,否则你会打比赛的条件(谷歌它).

还要仔细阅读GIL,因为Python中的计算多线程是有限的(与I/O多线程不同).


如果您仍然需要进程(例如,因为GIL),您可以将该数组放入共享内存中.

这是一个更复杂的主题,但是手册中也显示了joblib + numpy共享内存示例joblib.