Use*_*ser 13 python proxy multiprocessing python-3.x concurrent.futures
有三个问题可能重复(但过于具体):
通过回答这个问题,可以回答所有其他三个问题.希望我能说清楚:
一旦我在多处理创建的某个进程中创建了一个对象:
例1(已解决)
from concurrent.futures import *
def f(v):
return lambda: v * v
if __name__ == '__main__':
with ThreadPoolExecutor(1) as e: # works with ThreadPoolExecutor
l = list(e.map(f, [1,2,3,4]))
print([g() for g in l]) # [1, 4, 9, 16]
Run Code Online (Sandbox Code Playgroud)
例2
假设f
返回一个具有可变状态的对象.应该可以从其他进程访问此相同的对象.
例3
我有一个具有打开文件和锁的对象 - 如何授予对其他进程的访问权限?
提醒
我不希望出现此特定错误.或者是这个特定用例的解决方案.解决方案应足够通用,以便在进程之间共享不可移动的对象.可以在任何进程中创建对象.使所有对象可移动并保持身份的解决方案也可以是好的.
任何提示都是受欢迎的,任何指向如何实现解决方案的部分解决方案或代码片段都是值得的.所以我们可以一起创建解决方案.
以下是尝试解决此问题,但没有多处理:https://github.com/niccokunzmann/pynet/blob/master/documentation/done/tools.rst
问题
你希望其他进程对引用做什么?
引用可以传递给使用多处理创建的任何其他进程(重复3).可以访问属性,调用引用.访问的属性可能是也可能不是代理.
使用代理有什么问题?
也许没有问题,但挑战.我的印象是代理有一个管理器,管理器有自己的进程,因此必须序列化和转移不可序列化的对象(部分用StacklessPython/fork解决).还存在特殊对象的代理 - 为所有对象(可解决的)构建代理很难但不是不可能的.
解? - 代理+经理?
Eric Urban表明序列化不是问题.真正的挑战在于Example2&3:状态的同步.我对解决方案的想法是为经理创建一个特殊的代理类.这个代理类
Kri*_*itz 10
大多数情况下,不希望将现有对象的引用传递给另一个进程.而是创建要在进程之间共享的类:
class MySharedClass:
# stuff...
Run Code Online (Sandbox Code Playgroud)
然后你创建一个这样的代理管理器:
import multiprocessing.managers as m
class MyManager(m.BaseManager):
pass # Pass is really enough. Nothing needs to be done here.
Run Code Online (Sandbox Code Playgroud)
然后在该Manager上注册您的课程,如下所示:
MyManager.register("MySharedClass", MySharedClass)
Run Code Online (Sandbox Code Playgroud)
然后,一旦管理器实例化并启动,manager.start()
您就可以创建类的共享实例manager.MySharedClass
.这应该适用于所有需求.返回的代理与原始对象的工作方式完全相同,除了文档中描述的一些例外情况.
在阅读此答案之前,请注意其中解释的解决方案很糟糕。请注意答案末尾的警告。
我找到了一种通过共享对象状态的方法multiprocessing.Array
。因此,我创建了一个通过所有进程透明地共享其状态的类:
import multiprocessing as m
import pickle
class Store:
pass
class Shareable:
def __init__(self, size = 2**10):
object.__setattr__(self, 'store', m.Array('B', size))
o = Store() # This object will hold all shared values
s = pickle.dumps(o)
store(object.__getattribute__(self, 'store'), s)
def __getattr__(self, name):
s = load(object.__getattribute__(self, 'store'))
o = pickle.loads(s)
return getattr(o, name)
def __setattr__(self, name, value):
s = load(object.__getattribute__(self, 'store'))
o = pickle.loads(s)
setattr(o, name, value)
s = pickle.dumps(o)
store(object.__getattribute__(self, 'store'), s)
def store(arr, s):
for i, ch in enumerate(s):
arr[i] = ch
def load(arr):
l = arr[:]
return bytes(arr)
Run Code Online (Sandbox Code Playgroud)
您可以将此类(及其子类)的实例传递给任何其他进程,它将通过所有进程同步其状态。这是用以下代码测试的:
class Foo(Shareable):
def __init__(self):
super().__init__()
self.f = 1
def foo(self):
self.f += 1
def f(s):
s.f += 1
if __name__ == '__main__':
import multiprocessing as m
import time
s = Foo()
print(s.f)
p = m.Process(target=f, args=(s,))
p.start()
time.sleep(1)
print(s.f)
Run Code Online (Sandbox Code Playgroud)
该类的“魔力”在于它将所有属性存储在该类的另一个实例中Store
。这堂课并不是很特别。它只是一些可以具有任意属性的类。(听写也可以。)
然而,这个类有一些非常令人讨厌的怪癖。我找到了两个。
第一个怪癖是您必须指定Store
实例最多占用多少空间。这是因为multiprocessing.Array
具有静态尺寸。所以里面能pickle的对象只能和数组一样大。
第二个怪癖是您不能将此类与 ProcessPoolExecutors 或简单的池一起使用。如果您尝试这样做,您会收到错误消息:
>>> s = Foo()
>>> with ProcessPoolExecutor(1) as e:
... e.submit(f, args=(s,))
...
<Future at 0xb70fe20c state=running>
Traceback (most recent call last):
<omitted>
RuntimeError: SynchronizedArray objects should only be shared between processes through inheritance
Run Code Online (Sandbox Code Playgroud)
警告
您可能不应该使用这种方法,因为它使用无法控制的内存量,与使用代理相比过于复杂(请参阅我的其他答案),并且可能会以惊人的方式崩溃。
归档时间: |
|
查看次数: |
7533 次 |
最近记录: |