将数据传递给 Python multiprocessing.Pool 工作进程

The*_*inn 1 python python-multiprocessing

我正在尝试利用Pool.map(func, itr)来提高程序的性能,我需要func访问一个非常大的字典,称为cache以便它可以进行缓存查找。

cache卖场“每一个第一的二进制表示2**16的整数”。

cache = {i: bin(i) for i in range(2**16 - 1)}
Run Code Online (Sandbox Code Playgroud)

的职责func是计算传递给它的二进制表示中的1son-bits的数量int

def func(i: int) -> int:
    return cache[i].count("1")
Run Code Online (Sandbox Code Playgroud)

我想做如下事情:

with Pool(8) as pool:
    counts = pool.map(func, [i for i in range(2**16-1)])
Run Code Online (Sandbox Code Playgroud)

但是如何使cache对象func在每个工作子进程中可用?

The*_*inn 6

一个简单的解决方案

可以使用以下在互联网上找到的食谱来“超越”自己:

import functools

cache = {i: bin(i) for i in range(2**16 - 1)}

def func(i: int, cache: Dict[int, str]) -> int:
    return cache[i].count("1")


with Pool(8) as pool:
    # Bind 'cache' to 'func' and pass the partial to map()
    counts = pool.map(functools.partial(func, cache=cache),
                      [i for i in range(2**16-1)])
Run Code Online (Sandbox Code Playgroud)

这有效...直到您意识到这实际上比运行无并行化要慢!您最终在序列化/反序列化上花费的钱cache比从并行化中获得的投资回报率还要多。有关更深入的解释,请参阅Stuck in a Pickle

正确的解决方案

当前将数据复制到 Pool 工作子进程的“最佳实践”是以一种或另一种方式使变量global. 该模式如下所示:

cache = {i: bin(i) for i in range(2**16 - 1)}

def func(i: int) -> int:
    return global_cache[i].count("1")


def make_global(cache: Dict[int, str]) -> None:
    # Declare 'global_cache' to be Global
    global global_cache
    # Update 'global_cache' with a value, now *implicitly* accessible in func
    global_cache = cache


with Pool(8, initializer=make_global, initargs=(cache,)) as pool:
    counts = pool.map(func, [i for i in range(2**16-1)])
Run Code Online (Sandbox Code Playgroud)

同样的模式可以应用于面向对象的代码,为全局变量交换类属性我们通过这种方式购买了更多的封装。

关于函数体global内部关键字的说明make_global()'s

global上面的关键字声明了一个名为global_cache. 从声明 this 开始,直到程序结束, global_cache都可以在全局范围内访问,尽管它是在函数范围内声明的(尽管在分叉子进程之前不会“全球化”,隔离全局范围到工作进程)。

一个(提议的)新解决方案

还有第三种选择,尽管它存在于一个深埋在 github 存储库中的CPython叉子

这个 fork 提出了一项功能,允许您执行以下操作:

cache = {i: bin(i) for i in range(2**16 - 1)}

def func(i: int, initret: Dict[int, str]) -> int:
    cache = initret  # Re-assign var for illustrative/readability purposes
    return cache[i].count("1")


def identity(cache: Dict[int, str]) -> Dict[int, str]:
    return cache


with Pool(8, initializer=identity, initargs=(cache,)) as pool:
    counts = pool.map(func, [i for i in range(2**16-1)])
Run Code Online (Sandbox Code Playgroud)

虽然这是一个很小的变化,但它绕过了使用全局变量,并允许父进程和工作进程之间的“数据流”更具可读性。更多关于这里

本质上,每次在工作进程中调用时,initializeridentity()上面)的返回值都会传递给func(作为一个名为的kwarginitretfunc

注意: 我是上述所有链接博客文章的作者。

  • 由于您发布此答案已经有一段时间了,您能否更新此后是否有任何更改?对于最新版本的 python 3.7 甚至 3.8?因为,据我了解,“提议的”解决方案并不是基于官方的 python 版本,对吗? (2认同)