多处理管理器和自定义类

Ser*_*lda 7 python multiprocessing

我不知道为什么,但是每当我尝试传递给共享对象共享自定义类对象的方法时,我都会收到这个奇怪的错误。Python版本:3.6.3

代码:

from multiprocessing.managers import SyncManager

class MyManager(SyncManager): pass
class MyClass: pass

class Wrapper:
    def set(self, ent):
        self.ent = ent

MyManager.register('MyClass', MyClass)
MyManager.register('Wrapper', Wrapper)

if __name__ == '__main__':
    manager = MyManager()
    manager.start()

    try:
        obj = manager.MyClass()
        lst = manager.list([1,2,3])

        collection = manager.Wrapper()
        collection.set(lst) # executed fine
        collection.set(obj) # raises error
    except Exception as e:
        raise
Run Code Online (Sandbox Code Playgroud)

错误:

---------------------------------------------------------------------------
Traceback (most recent call last):
  File "D:\Program Files\Python363\lib\multiprocessing\managers.py", line 228, in serve_client
    request = recv()
  File "D:\Program Files\Python363\lib\multiprocessing\connection.py", line 251, in recv
    return _ForkingPickler.loads(buf.getbuffer())
  File "D:\Program Files\Python363\lib\multiprocessing\managers.py", line 881, in RebuildProxy
    return func(token, serializer, incref=incref, **kwds)
TypeError: AutoProxy() got an unexpected keyword argument 'manager_owned'
---------------------------------------------------------------------------
Run Code Online (Sandbox Code Playgroud)

这里有什么问题?

Mar*_*ers 10

如前所述,我也遇到了这个问题,这是 Python 中的一个错误multiprocessing(请参阅问题 #30256),纠正此问题拉取请求尚未合并。此拉取请求已被另一个 PR取代,该 PR进行了相同的更改,但也添加了一个测试。

除了手动修补本地安装之外,您还有其他三个选项:

  • 你可以使用MakeProxyType()callable 来指定你的AutoProxy代理类型,而不依赖于代理生成器,
  • 你可以定义一个自定义代理类,
  • 你可以用monkeypatch修补这个bug

在解释了这些选项的作用之后,我将在下面描述这些选项AutoProxy

AutoProxy上课有什么意义

多处理Manager模式通过将所有值放在相同的专用“规范值服务器”进程中来访问共享值。所有其他进程(客户端)通过代理与服务器通信,然后与服务器来回传递消息。

服务器确实需要知道哪些方法可以用于对象类型,但是,客户端可以使用相同的方法生成代理对象。这就是AutoProxy对象的用途。每当客户端需要注册类的新实例时,客户端创建的默认代理是AutoProxy,然后要求服务器告诉它可以使用哪些方法。

一旦有了方法名称,它就会调用MakeProxyType构造一个新类,然后为该类创建一个实例以返回。

所有这些都被推迟到您确实需要代理类型的实例时,因此AutoProxy如果您不使用已注册的某些类,原则上可以节省一点内存。然而,它的内存非常少,缺点是这个过程必须在每个客户端进程中进行。

这些代理对象使用引用计数来跟踪服务器何时可以删除规范值。正是在AutoProxy可调用文件中被破坏的那部分;当在服务器进程中而不是在客户端中创建代理对象时,将一个新参数传递给代理类型以禁用引用计数,但AutoProxy类型未更新以支持此功能。

那么,你怎么能解决这个问题?以下是这 3 个选项:

使用MakeProxyType()可调用

如前所述,AutoProxy实际上只是调用(通过服务器)来获取该类型的公共方法,以及调用MakeProxyType(). 您可以在注册时自己拨打这些电话。

所以,而不是

from multiprocessing.managers import SyncManager
SyncManager.register("YourType", YourType)
Run Code Online (Sandbox Code Playgroud)

from multiprocessing.managers import SyncManager, MakeProxyType, public_methods
#               arguments:    classname,  sequence of method names
YourTypeProxy = MakeProxyType("YourType", public_methods(YourType))
SyncManager.register("YourType", YourType, YourTypeProxy)
Run Code Online (Sandbox Code Playgroud)

随意在MakeProxyType()那里内联呼叫。

如果您使用exposed参数 to SyncManager.register(),则应将这些名称传递给MakeProxyType

# SyncManager.register("YourType", YourType, exposed=("foo", "bar"))
# becomes
YourTypeProxy = MakeProxyType("YourType", ("foo", "bar"))
SyncManager.register("YourType", YourType, YourTypeProxy)
Run Code Online (Sandbox Code Playgroud)

您也必须对所有预注册的类型执行此操作:

from multiprocessing.managers import SyncManager, AutoProxy, MakeProxyType, public_methods

registry = SyncManager._registry
for typeid, (callable, exposed, method_to_typeid, proxytype) in registry.items():
    if proxytype is not AutoProxy:
        continue
    create_method = hasattr(managers.SyncManager, typeid)
    if exposed is None:
        exposed = public_methods(callable) 
    SyncManager.register(
        typeid,
        callable=callable,
        exposed=exposed,
        method_to_typeid=method_to_typeid,
        proxytype=MakeProxyType(f"{typeid}Proxy", exposed),
        create_method=create_method,
    )
Run Code Online (Sandbox Code Playgroud)

创建自定义代理

不能依赖多处理为您创建代理。你可以只写你自己的。除了特殊的“托管值”服务器进程之外,所有进程都使用代理,并且代理应该来回传递消息。当然,这不是已注册类型的选项,但我在这里提到它是因为对于您自己的类型,这提供了优化的机会。

请注意,你应该有方法的所有互动,需要回到“经典”值实例,因此你需要使用属性来处理正常的属性或添加__getattr____setattr____delattr__方法的需要。

优点是您可以非常精细地控制哪些方法实际需要与服务器进程交换数据;在我的具体示例中,我的代理类缓存了不可变的信息(一旦创建对象,这些值就永远不会改变),但经常使用。这包括控制其他方法是否会做某事的标志值,因此代理可以只检查标志值,如果未设置,则不与服务器进程对话。像这样的东西:

class FooProxy(BaseProxy):
    # what methods the proxy is allowed to access through calls
    _exposed_ = ("__getattribute__", "expensive_method", "spam")

    @property
    def flag(self):
        try:
            v = self._flag
        except AttributeError:
            # ask for the value from the server, "realvalue.flag"
            # use __getattribute__ because it's an attribute, not a property
            v = self._flag = self._callmethod("__getattribute__", ("flag",))
        return flag

    def expensive_method(self, *args, **kwargs):
        if self.flag:   # cached locally!
            return self._callmethod("expensive_method", args, kwargs)

    def spam(self, *args, **kwargs):
        return self._callmethod("spam", args, kwargs

SyncManager.register("Foo", Foo, FooProxy)
Run Code Online (Sandbox Code Playgroud)

因为MakeProxyType()返回一个BaseProxy子类,您可以将该类与自定义子类结合使用,这样您就不必编写任何仅包含以下内容的方法return self._callmethod(...)

# a base class with the methods generated for us. The second argument
# doubles as the 'permitted' names, stored as _exposed_
FooProxyBase = MakeProxyType(
    "FooProxyBase",
    ("__getattribute__", "expensive_method", "spam"),
)

class FooProxy(FooProxyBase):
    @property
    def flag(self):
        try:
            v = self._flag
        except AttributeError:
            # ask for the value from the server, "realvalue.flag"
            # use __getattribute__ because it's an attribute, not a property
            v = self._flag = self._callmethod("__getattribute__", ("flag",))
        return flag

    def expensive_method(self, *args, **kwargs):
        if self.flag:   # cached locally!
            return self._callmethod("expensive_method", args, kwargs)

    def spam(self, *args, **kwargs):
        return self._callmethod("spam", args, kwargs

SyncManager.register("Foo", Foo, FooProxy)
Run Code Online (Sandbox Code Playgroud)

同样,这不会解决嵌套在其他代理值中的标准类型的问题。

应用猴子补丁

我使用它来修复AutoProxy可调用对象,当您运行已将修复程序应用于源代码的 Python 版本时,这应该会自动避免修补:

# Backport of https://github.com/python/cpython/pull/4819
# Improvements to the Manager / proxied shared values code
# broke handling of proxied objects without a custom proxy type,
# as the AutoProxy function was not updated.
#
# This code adds a wrapper to AutoProxy if it is missing the
# new argument.

import logging
from inspect import signature
from functools import wraps
from multiprocessing import managers


logger = logging.getLogger(__name__)
orig_AutoProxy = managers.AutoProxy


@wraps(managers.AutoProxy)
def AutoProxy(*args, incref=True, manager_owned=False, **kwargs):
    # Create the autoproxy without the manager_owned flag, then
    # update the flag on the generated instance. If the manager_owned flag
    # is set, `incref` is disabled, so set it to False here for the same
    # result.
    autoproxy_incref = False if manager_owned else incref
    proxy = orig_AutoProxy(*args, incref=autoproxy_incref, **kwargs)
    proxy._owned_by_manager = manager_owned
    return proxy


def apply():
    if "manager_owned" in signature(managers.AutoProxy).parameters:
        return

    logger.debug("Patching multiprocessing.managers.AutoProxy to add manager_owned")
    managers.AutoProxy = AutoProxy

    # re-register any types already registered to SyncManager without a custom
    # proxy type, as otherwise these would all be using the old unpatched AutoProxy
    SyncManager = managers.SyncManager
    registry = managers.SyncManager._registry
    for typeid, (callable, exposed, method_to_typeid, proxytype) in registry.items():
        if proxytype is not orig_AutoProxy:
            continue
        create_method = hasattr(managers.SyncManager, typeid)
        SyncManager.register(
            typeid,
            callable=callable,
            exposed=exposed,
            method_to_typeid=method_to_typeid,
            create_method=create_method,
        )
Run Code Online (Sandbox Code Playgroud)

导入上述内容并调用apply()函数来修复multiprocessing启动管理器服务器之前执行此操作!

  • 这个答案令人印象深刻。令人有些尴尬的是,这个问题多年来一直没有得到解决,尤其是多处理由于能够绕过 GIL 而成为 Python 中越来越重要的一部分。 (3认同)

Luc*_*tti 6

解决方案编辑多处理源代码

Sergey的原始答案要求您编辑多处理源代码,如下所示:

  1. 找到您的多处理包(我的,通过Anaconda安装,在 中/anaconda3/lib/python3.6/multiprocessing)。
  2. 打开 managers.py
  3. 将 key 参数添加manager_owned=TrueAutoProxy函数中。

原始自动代理:

def AutoProxy(token, serializer, manager=None, authkey=None,
          exposed=None, incref=True):
    ...
Run Code Online (Sandbox Code Playgroud)

编辑自动代理:

def AutoProxy(token, serializer, manager=None, authkey=None,
          exposed=None, incref=True, manager_owned=True):
    ...
Run Code Online (Sandbox Code Playgroud)

在运行时通过代码解决

我设法解决了意外的关键字参数TypeError 异常,而无需直接编辑多处理的源代码,而是在我使用多处理管理器的地方添加以下几行代码:

import multiprocessing

# Backup original AutoProxy function
backup_autoproxy = multiprocessing.managers.AutoProxy

# Defining a new AutoProxy that handles unwanted key argument 'manager_owned'
def redefined_autoproxy(token, serializer, manager=None, authkey=None,
          exposed=None, incref=True, manager_owned=True):
    # Calling original AutoProxy without the unwanted key argument
    return backup_autoproxy(token, serializer, manager, authkey,
                     exposed, incref)

# Updating AutoProxy definition in multiprocessing.managers package
multiprocessing.managers.AutoProxy = redefined_autoproxy
Run Code Online (Sandbox Code Playgroud)