在不同进程之间共享内存中的复杂 python 对象

Meg*_*ega 7 pickle shared-memory python-3.x

我有一个复杂的 python 对象,内存大小约为 36GB,我想在多个单独的 python 进程之间共享它。它作为 pickle 文件存储在磁盘上,目前我为每个进程单独加载该文件。我想共享这个对象,以便在可用内存量的情况下并行执行更多进程。

从某种意义上说,该对象用作只读数据库。每个进程每秒都会发起多次访问请求,而每次请求只是针对一小部分数据。

我研究了像 Radis 这样的解决方案,但我发现最终数据需要序列化为简单的文本形式。此外,将 pickle 文件本身映射到内存应该没有帮助,因为每个进程都需要提取它。所以我想到了另外两种可能的解决方案:

  1. 使用共享内存,每个进程都可以访问存储对象的地址。这里的问题是进程只会看到大量字节,无法解释这些字节
  2. 编写一段代码来保存该对象并通过 API 调用管理数据检索。在这里,我想知道这种解决方案在速度方面的表现。

有没有一种简单的方法来实施这些解决方案?也许对于这种情况有更好的解决方案?

非常感谢!

biv*_*ac0 6

对于复杂的对象,没有现成的方法可以在进程之间直接共享内存。如果你有简单的方法,ctypes你可以在 c 风格的共享内存中执行此操作,但它不会直接映射到 python 对象。

如果您在任何时候只需要一部分数据而不是整个 36GB,那么有一个简单的解决方案非常有效。为此,您可以使用SyncManagerfrom multiprocessing.managers。使用它,您可以设置一个为您的数据提供代理类的服务器(您的数据不存储在该类中,代理仅提供对它的访问)。然后,您的客户端使用 a 连接到服务器BaseManager并调用代理类中的方法来检索数据。

在幕后,这些Manager类负责对您请求的数据进行腌制,并通过开放端口将其从服务器发送到客户端。因为每次调用都会对数据进行酸洗,所以如果您需要整个数据集,则效率不高。在客户端只需要一小部分数据的情况下,该方法可以节省大量时间,因为数据只需要由服务器加载一次。

该解决方案在速度方面与数据库解决方案相当,但如果您希望保留纯粹的 Python 解决方案,它可以为您节省大量复杂性和 DB 学习。

以下是一些旨在与 GloVe 词向量配合使用的示例代码。

服务器

#!/usr/bin/python
import  sys
from    multiprocessing.managers import SyncManager
import  numpy

# Global for storing the data to be served
gVectors = {}

# Proxy class to be shared with different processes
# Don't but the big vector data in here since that will force it to 
# be piped to the other process when instantiated there, instead just
# return the global vector data, from this process, when requested.
class GloVeProxy(object):
    def __init__(self):
        pass

    def getNVectors(self):
        global gVectors
        return len(gVectors)

    def getEmpty(self):
        global gVectors
        return numpy.zeros_like(gVectors.values()[0])

    def getVector(self, word, default=None):
        global gVectors
        return gVectors.get(word, default)

# Class to encapsulate the server functionality
class GloVeServer(object):
    def __init__(self, port, fname):
        self.port = port
        self.load(fname)

    # Load the vectors into gVectors (global)
    @staticmethod
    def load(filename):
        global gVectors
        f = open(filename, 'r')
        for line in f:
            vals = line.rstrip().split(' ')
            gVectors[vals[0]] = numpy.array(vals[1:]).astype('float32')

    # Run the server
    def run(self):
        class myManager(SyncManager): pass  
        myManager.register('GloVeProxy', GloVeProxy)
        mgr = myManager(address=('', self.port), authkey='GloVeProxy01')
        server = mgr.get_server()
        server.serve_forever()

if __name__ == '__main__':
    port  = 5010
    fname = '/mnt/raid/Data/Misc/GloVe/WikiGiga/glove.6B.50d.txt'

    print 'Loading vector data'
    gs = GloVeServer(port, fname)

    print 'Serving data. Press <ctrl>-c to stop.'
    gs.run()
Run Code Online (Sandbox Code Playgroud)

客户

from   multiprocessing.managers import BaseManager
import psutil   #3rd party module for process info (not strictly required)

# Grab the shared proxy class.  All methods in that class will be availble here
class GloVeClient(object):
    def __init__(self, port):
        assert self._checkForProcess('GloVeServer.py'), 'Must have GloVeServer running'
        class myManager(BaseManager): pass
        myManager.register('GloVeProxy')
        self.mgr = myManager(address=('localhost', port), authkey='GloVeProxy01')
        self.mgr.connect()
        self.glove = self.mgr.GloVeProxy()

    # Return the instance of the proxy class
    @staticmethod
    def getGloVe(port):
        return GloVeClient(port).glove

    # Verify the server is running
    @staticmethod
    def _checkForProcess(name):
        for proc in psutil.process_iter():
            if proc.name() == name:
                return True
        return False

if __name__ == '__main__':
    port = 5010
    glove = GloVeClient.getGloVe(port)

    for word in ['test', 'cat', '123456']:
        print('%s = %s' % (word, glove.getVector(word)))
Run Code Online (Sandbox Code Playgroud)

请注意,该psutil库仅用于检查服务器是否正在运行,这不是必需的。请务必为服务器命名GloVeServer.py或更改代码中的检查方式psutil,以便它查找正确的名称。