多处理 - 共享一个复杂的对象

mba*_*rov 14 python concurrency multiprocessing

我有一个类似大型dict的对象,需要在许多工作进程之间共享.每个工作者读取对象中信息的随机子集,并使用它进行一些计算.我想避免复制大对象,因为我的机器很快耗尽了内存.

我正在玩这个SO问题的代码,我修改了一下使用固定大小的进程池,这更适合我的用例.然而,这似乎打破了它.

from multiprocessing import Process, Pool
from multiprocessing.managers import BaseManager

class numeri(object):
    def __init__(self):
        self.nl = []

    def getLen(self):
        return len(self.nl)

    def stampa(self):
        print self.nl

    def appendi(self, x):
        self.nl.append(x)

    def svuota(self):
        for i in range(len(self.nl)):
            del self.nl[0]

class numManager(BaseManager):
    pass

def produce(listaNumeri):
    print 'producing', id(listaNumeri)
    return id(listaNumeri)

def main():
    numManager.register('numeri', numeri, exposed=['getLen', 'appendi',
                        'svuota', 'stampa'])
    mymanager = numManager()
    mymanager.start()
    listaNumeri = mymanager.numeri()
    print id(listaNumeri)

    print '------------ Process'
    for i in range(5):
        producer = Process(target=produce, args=(listaNumeri,))
        producer.start()
        producer.join()

    print '--------------- Pool'
    pool = Pool(processes=1)
    for i in range(5):
        pool.apply_async(produce, args=(listaNumeri,)).get()

if __name__ == '__main__':
    main()
Run Code Online (Sandbox Code Playgroud)

输出是

4315705168
------------ Process
producing 4315705168
producing 4315705168
producing 4315705168
producing 4315705168
producing 4315705168
--------------- Pool
producing 4299771152
producing 4315861712
producing 4299771152
producing 4315861712
producing 4299771152
Run Code Online (Sandbox Code Playgroud)

如您所见,在第一种情况下,所有工作进程都获得相同的对象(通过id).在第二种情况下,id不一样.这是否意味着对象被复制?

PS我认为不重要,但我正在使用joblib,内部使用的是Pool:

from joblib import delayed, Parallel

print '------------- Joblib'
        Parallel(n_jobs=4)(delayed(produce)(listaNumeri) for i in range(5))
Run Code Online (Sandbox Code Playgroud)

哪个输出:

------------- Joblib
producing 4315862096
producing 4315862288
producing 4315862480
producing 4315862672
producing 4315862352
Run Code Online (Sandbox Code Playgroud)

Tim*_*ers 16

我担心这里几乎没有任何东西可以按你希望的方式工作:-(

首先请注意,不同进程id()生成相同值不会告诉您对象是否真的是同一个对象.每个进程都有自己的虚拟地址空间,由操作系统分配.两个进程中的相同虚拟地址可以指代完全不同的物理内存位置.您的代码是否产生相同的id()输出几乎是偶然的.在多次运行中,有时我会id()在您的Process部分中看到不同的输出,并在您的部分中重复id()输出,Pool反之亦然,或两者兼而有之.

第二,Manager供应语义共享但不是物理共享.您numeri实例的数据仅存在于经理流程中.您的所有工作进程都会查看(副本)代理对象.这些是薄包装器,用于转发由管理器进程执行的所有操作.这涉及到许多进程间通信,以及管理器进程内的序列化.这是写一个非常慢的代码的好方法;-)是的,只有一个numeri数据副本,但所有工作都由一个进程(管理器进程)完成.

要更清楚地看到这一点,请修改@martineau建议的更改,并更改get_list_id()为:

def get_list_id(self):  # added method
    import os
    print("get_list_id() running in process", os.getpid())
    return id(self.nl)
Run Code Online (Sandbox Code Playgroud)

这是示例输出:

41543664
------------ Process
producing 42262032
get_list_id() running in process 5856
with list_id 44544608
producing 46268496
get_list_id() running in process 5856
with list_id 44544608
producing 42262032
get_list_id() running in process 5856
with list_id 44544608
producing 44153904
get_list_id() running in process 5856
with list_id 44544608
producing 42262032
get_list_id() running in process 5856
with list_id 44544608
--------------- Pool
producing 41639248
get_list_id() running in process 5856
with list_id 44544608
producing 41777200
get_list_id() running in process 5856
with list_id 44544608
producing 41776816
get_list_id() running in process 5856
with list_id 44544608
producing 41777168
get_list_id() running in process 5856
with list_id 44544608
producing 41777136
get_list_id() running in process 5856
with list_id 44544608
Run Code Online (Sandbox Code Playgroud)

明确?每次获得相同列表ID的原因不是因为每个工作进程都具有相同的self.nl成员,这是因为所有numeri方法都在单个进程(管理器进程)中运行.这就是列表id始终相同的原因.

如果你在Linux-y系统(支持的操作系统fork())上运行,那么更好的想法是忘记所有这些Manager东西并在启动任何工作进程之前在模块级创建复杂对象.然后工作人员将继承复杂对象的(地址空间副本).通常的写时复制fork()语义将使内存效率尽可能高.如果不需要将突变折叠回主程序的复杂对象副本中就足够了.如果确实需要折叠突变,那么你需要进行大量的进程间通信,并且multiprocessing相应地变得不那么有吸引力.

这里没有简单的答案.不要拍信使;-)


Rob*_*tts 5

如果将两行代码添加到代码中,您将发现有关此行为的一些奇异之处:

def produce(listaNumeri):
    print 'producing', id(listaNumeri)
    print listaNumeri # <- New line
    return id(listaNumeri)


def main():
    numManager.register('numeri', numeri, exposed=['getLen', 'appendi', 'svuota', 'stampa', 'getAll'])
    mymanager = numManager()
    mymanager.start()
    listaNumeri = mymanager.numeri()
    print listaNumeri # <- New line
    print id(listaNumeri)
Run Code Online (Sandbox Code Playgroud)

这将为您提供以下输出:

<__main__.numeri object at 0x103892990>
4354247888
------------ Process
producing 4354247888
<__main__.numeri object at 0x103892990>
producing 4354247888
<__main__.numeri object at 0x103892990>
producing 4354247888
<__main__.numeri object at 0x103892990>
producing 4354247888
<__main__.numeri object at 0x103892990>
producing 4354247888
<__main__.numeri object at 0x103892990>
--------------- Pool
producing 4352988560
<__main__.numeri object at 0x103892990>
producing 4354547664
<__main__.numeri object at 0x103892990>
producing 4352988560
<__main__.numeri object at 0x103892990>
producing 4354547664
<__main__.numeri object at 0x103892990>
producing 4352988560
<__main__.numeri object at 0x103892990>
Run Code Online (Sandbox Code Playgroud)

如您所见,对象每次都相同,但id并不总是相同。另外,请查看池部分中使用的ID-它在两个ID之间来回切换。

问题的答案是来自期间实际打印__class__属性produce。每次运行,__class__实际上是

<class 'multiprocessing.managers.AutoProxy[numeri]'>
Run Code Online (Sandbox Code Playgroud)

因此,numeri对象AutoProxy每次都被包裹,并且AutoProxy并不总是相同的。但是,numeri被包装的对象在的每次调用中都相同produce。如果您appendi一次调用该方法produce,则listaNumeri在程序末尾将有10个项目。


mar*_*eau 5

您正在将对象实例numeri与其Manager 混淆listaNumeri。这可以通过对代码进行一些小的修改来说明:

首先添加一个get_list_id方法,class numeri(object)以返回使用id的实际内部数据结构的:

    ...                                                   
    def get_list_id(self):  # added method
        return id(self.nl)
Run Code Online (Sandbox Code Playgroud)

然后修改produce()以使用它:

def produce(listaNumeri):
    print 'producing', id(listaNumeri)
    print ' with list_id', listaNumeri.get_list_id()  # added
    return id(listaNumeri)
Run Code Online (Sandbox Code Playgroud)

最后,请确保将新方法公开为numManager接口的一部分:

def main():
    numManager.register('numeri', numeri, exposed=['getLen', 'appendi',
                                                   'svuota', 'stampa',
                                                   'get_list_id'])  # added
    ...                                                   
Run Code Online (Sandbox Code Playgroud)

之后,您将看到类似以下输出的内容:

    ...                                                   
    def get_list_id(self):  # added method
        return id(self.nl)
Run Code Online (Sandbox Code Playgroud)

如此所示,即使Manager每个Pool进程有一个不同的对象,它们都使用(共享)相同的“托管”数据对象。