多处理在进程之间共享不可序列化的对象

Use*_*ser 13 python proxy multiprocessing python-3.x concurrent.futures

有三个问题可能重复(但过于具体):

通过回答这个问题,可以回答所有其他三个问题.希望我能说清楚:

一旦我在多处理创建的某个进程中创建了一个对象:

  1. 如何将对该对象的引用传递给其他进程?
  2. (不是那么重要)我如何确保在持有参考时这个过程不会消失?

例1(已解决)

from concurrent.futures import *

def f(v):
    return lambda: v * v

if __name__ == '__main__':
    with ThreadPoolExecutor(1) as e: # works with ThreadPoolExecutor
        l = list(e.map(f, [1,2,3,4]))
    print([g() for g in l]) # [1, 4, 9, 16]
Run Code Online (Sandbox Code Playgroud)

例2

假设f返回一个具有可变状态的对象.应该可以从其他进程访问此相同的对象.

例3

我有一个具有打开文件和锁的对象 - 如何授予对其他进程的访问权限?

提醒

我不希望出现此特定错误.或者是这个特定用例的解决方案.解决方案应足够通用,以便在进程之间共享不可移动的对象.可以在任何进程中创建对象.使所有对象可移动并保持身份的解决方案也可以是好的.

任何提示都是受欢迎的,任何指向如何实现解决方案的部分解决方案或代码片段都是值得的.所以我们可以一起创建解决方案.

以下是尝试解决此问题,但没有多处理:https://github.com/niccokunzmann/pynet/blob/master/documentation/done/tools.rst

问题

你希望其他进程对引用做什么?

引用可以传递给使用多处理创建的任何其他进程(重复3).可以访问属性,调用引用.访问的属性可能是也可能不是代理.

使用代理有什么问题?

也许没有问题,但挑战.我的印象是代理有一个管理器,管理器有自己的进程,因此必须序列化和转移不可序列化的对象(部分用StacklessPython/fork解决).还存在特殊对象的代理 - 为所有对象(可解决的)构建代理很难但不是不可能的.

解? - 代理+经理?

Eric Urban表明序列化不是问题.真正的挑战在于Example2&3:状态的同步.我对解决方案的想法是为经理创建一个特殊的代理类.这个代理类

  1. 为不可序列化的对象采用构造函数
  2. 获取可序列化对象并将其传输到管理器进程.
  3. (问题)根据1.必须在经理过程中创建不可序列化的对象.

Kri*_*itz 10

大多数情况下,不希望将现有对象的引用传递给另一个进程.而是创建要在进程之间共享的类:

class MySharedClass:
    # stuff...
Run Code Online (Sandbox Code Playgroud)

然后你创建一个这样的代理管理器:

import multiprocessing.managers as m
class MyManager(m.BaseManager):
    pass # Pass is really enough. Nothing needs to be done here.
Run Code Online (Sandbox Code Playgroud)

然后在该Manager上注册您的课程,如下所示:

MyManager.register("MySharedClass", MySharedClass)
Run Code Online (Sandbox Code Playgroud)

然后,一旦管理器实例化并启动,manager.start()您就可以创建类的共享实例manager.MySharedClass.这应该适用于所有需求.返回的代理与原始对象的工作方式完全相同,除了文档中描述的一些例外情况.


Kri*_*itz 5

在阅读此答案之前,请注意其中解释的解决方案很糟糕。请注意答案末尾的警告。

我找到了一种通过共享对象状态的方法multiprocessing.Array。因此,我创建了一个通过所有进程透明地共享其状态的类:

import multiprocessing as m
import pickle

class Store:
    pass

class Shareable:
    def __init__(self, size = 2**10):
        object.__setattr__(self, 'store', m.Array('B', size))
        o = Store() # This object will hold all shared values
        s = pickle.dumps(o)
        store(object.__getattribute__(self, 'store'), s)

    def __getattr__(self, name):
        s = load(object.__getattribute__(self, 'store'))
        o = pickle.loads(s)
        return getattr(o, name)

    def __setattr__(self, name, value):
        s = load(object.__getattribute__(self, 'store'))
        o = pickle.loads(s)
        setattr(o, name, value)
        s = pickle.dumps(o)
        store(object.__getattribute__(self, 'store'), s)

def store(arr, s):
    for i, ch in enumerate(s):
        arr[i] = ch

def load(arr):
    l = arr[:]
    return bytes(arr)
Run Code Online (Sandbox Code Playgroud)

您可以将此类(及其子类)的实例传递给任何其他进程,它将通过所有进程同步其状态。这是用以下代码测试的:

class Foo(Shareable):
    def __init__(self):
        super().__init__()
        self.f = 1

    def foo(self):
        self.f += 1

def f(s):
    s.f += 1

if __name__ == '__main__':
    import multiprocessing as m
    import time
    s = Foo()
    print(s.f)
    p = m.Process(target=f, args=(s,))
    p.start()
    time.sleep(1)
    print(s.f)
Run Code Online (Sandbox Code Playgroud)

该类的“魔力”在于它将所有属性存储在该类的另一个实例中Store。这堂课并不是很特别。它只是一些可以具有任意属性的类。(听写也可以。)

然而,这个类有一些非常令人讨厌的怪癖。我找到了两个。

第一个怪癖是您必须指定Store实例最多占用多少空间。这是因为multiprocessing.Array具有静态尺寸。所以里面能pickle的对象只能和数组一样大。

第二个怪癖是您不能将此类与 ProcessPoolExecutors 或简单的池一起使用。如果您尝试这样做,您会收到错误消息:

>>> s = Foo()
>>> with ProcessPoolExecutor(1) as e:
...     e.submit(f, args=(s,))
... 
<Future at 0xb70fe20c state=running>
Traceback (most recent call last):
<omitted>
RuntimeError: SynchronizedArray objects should only be shared between processes through inheritance
Run Code Online (Sandbox Code Playgroud)

警告
您可能不应该使用这种方法,因为它使用无法控制的内存量,与使用代理相比过于复杂(请参阅我的其他答案),并且可能会以惊人的方式崩溃。