如何在类中使用多处理管理器

Ale*_*are 4 python multiprocessing

首先,这是一些有效的代码

from multiprocessing import Pool, Manager
import random

manager = Manager()
dct = manager.dict()

def do_thing(n):
    for i in range(10_000_000):
        i += 1
    dct[n] = random.randint(0, 9)

with Pool(2) as pool:
    pool.map(do_thing, range(10))
Run Code Online (Sandbox Code Playgroud)

现在,如果我尝试用它来创建一个类:

from multiprocessing import Pool, Manager
import random


class SomeClass:
    def __init__(self):
        self.manager = Manager()
        self.dct = self.manager.dict()

    def __call__(self):
        with Pool(2) as pool:
            pool.map(self.do_thing, range(10))

    def do_thing(self, n):
        for i in range(10_000_000):
            i += 1
        self.dct[n] = random.randint(0, 9)


if __name__ == '__main__':
    inst = SomeClass()
    inst()
Run Code Online (Sandbox Code Playgroud)

我遇到:TypeError: Pickling an AuthenticationString object is disallowed for security reasons。现在从这里,我得到提示,Python 正在尝试腌制,Manager据我所知,它有自己的专用进程,并且进程不能被腌制,因为它们包含AuthenticationString.

我对分叉的工作原理了解不够(我在 Linux 上,所以我知道这是启动新进程的默认方法),无法准确理解为什么Manager需要对实例进行 pickle。

这是我的问题:

  1. 为什么会发生这种情况?
  2. Manager在类中进行多重处理时如何使用 a ?PS:我希望能够从这个模块导入 SomeClass 。
  3. 我的要求是否不合理或非常规?

PS:我知道我可以完成这个精确的片段,而无需Manager利用将按顺序返回内容的事实pool.map,所以像这样:res = pool.map(self.do_thing, range(10))then dct = {k: v for k, v in zip(range(10), res)}。但这不是问题的重点。

mar*_*eau 5

回答您的问题:

\n

Q1 - 为什么会发生这种情况?

\n

创建的每个工作进程都需要Pool.map()执行实例方法self.do_thing()。为了做到这一点,Python pickle 实例并将其传递给子进程(子进程取消它)。如果每个实例都有一个,Manager这将是一个问题,因为它们不可腌制。取消腌制过程的一部分涉及导入定义类的模块并恢复实例的属性(也已腌制)。

\n

Q2 - 如何修复它

\n

您可以通过让类创建自己的类级别Manager(由该类的所有实例共享)来避免该问题。这里,该__init__()方法manager在第一次创建实例时创建类属性,从那时起,其他实例将重用此 \xe2\x80\x94 它有时称为“延迟初始化”

\n
from multiprocessing import Pool, Manager\nimport random\n\n\nclass SomeClass:\n    def __init__(self):\n        # Lazy creation of class attribute.\n        try:\n            manager = getattr(type(self), 'manager')\n        except AttributeError:\n            manager = type(self).manager = Manager()\n        self.dct = manager.dict()\n\n    def __call__(self):\n        with Pool(2) as pool:\n            pool.map(self.do_thing, range(10))\n        print('done')\n\n    def do_thing(self, n):\n        for i in range(10_000_000):\n            i += 1\n        self.dct[n] = random.randint(0, 9)\n\n\nif __name__ == '__main__':\n    inst = SomeClass()\n    inst()\n
Run Code Online (Sandbox Code Playgroud)\n

Q3 - 这是合理的做法吗?

\n

在我看来,是的。

\n