避免multiprocessing.Pool工作者之间的不可用共享状态的全局变量

zwo*_*wol 9 python fork multiprocessing python-3.x python-multiprocessing

我经常发现自己用Python编写程序来构造一个大的(兆字节)只读数据结构,然后使用该数据结构来分析一个非常大的(总共几百兆字节)小记录列表.每个记录都可以并行分析,因此自然模式是设置只读数据结构并将其分配给全局变量,然后创建一个multiprocessing.Pool(通过隐式将数据结构复制到每个工作进程中fork)和然后用来imap_unordered并行处理记录.这种模式的骨架看起来像这样:

classifier = None
def classify_row(row):
    return classifier.classify(row)

def classify(classifier_spec, data_file):
    global classifier
    try:
        classifier = Classifier(classifier_spec)
        with open(data_file, "rt") as fp, \
             multiprocessing.Pool() as pool:
            rd = csv.DictReader(fp)
            yield from pool.imap_unordered(classify_row, rd)
    finally:
        classifier = None
Run Code Online (Sandbox Code Playgroud)

我因为全局变量之间的隐式耦合的不满意这点classifyclassify_row.理想情况下,我想写

def classify(classifier_spec, data_file):
    classifier = Classifier(classifier_spec)
    with open(data_file, "rt") as fp, \
         multiprocessing.Pool() as pool:
        rd = csv.DictReader(fp)
        yield from pool.imap_unordered(classifier.classify, rd)
Run Code Online (Sandbox Code Playgroud)

但是这不起作用,因为Classifier对象通常包含无法pickle的对象(因为它们是由作者不关心的扩展模块定义的); 我还读过如果它确实有效会很慢,因为在每次调用绑定方法时,Classifier对象都会被复制到工作进程中.

还有更好的选择吗?我只关心3.x.

Jam*_*Lim 5

这非常棘手.这里的关键是保留对fork时可用而没有序列化的变量的读访问.在多处理中共享内存的大多数解决方案最终都会序列化.我尝试使用a weakref.proxy来传递没有序列化的分类器,但是这不起作用,因为dill和pickle都会尝试跟踪并序列化引用.但是,模块引用工作.

这个组织让我们接近:

import multiprocessing as mp
import csv


def classify(classifier, data_file):

    with open(data_file, "rt") as fp, mp.Pool() as pool:
        rd = csv.DictReader(fp)
        yield from pool.imap_unordered(classifier.classify, rd)


def orchestrate(classifier_spec, data_file):
    # construct a classifier from the spec; note that we can
    # even dynamically import modules here, using config values
    # from the spec
    import classifier_module
    classifier_module.init(classifier_spec)
    return classify(classifier_module, data_file)


if __name__ == '__main__':
    list(orchestrate(None, 'data.txt'))
Run Code Online (Sandbox Code Playgroud)

这里需要注意一些变化:

  • 我们orchestrate为DI优点添加了一种方法; orchestrate指出如何构建/初始化分类器,并将其交给classify两者,将两者分离
  • classify只需要假设classifier参数有一个classify方法; 它不关心它是一个实例还是一个模块

对于这个概念证明,我们提供了一个显然不可序列化的分类器:

# classifier_module.py
def _create_classifier(spec):

    # obviously not pickle-able because it's inside a function
    class Classifier():

        def __init__(self, spec):
            pass

        def classify(self, x):
            print(x)
            return x

    return Classifier(spec)


def init(spec):
    global __classifier
    __classifier = _create_classifier(spec)


def classify(x):
    return __classifier.classify(x)
Run Code Online (Sandbox Code Playgroud)

不幸的是,这里还有一个全局,但它现在很好地封装在模块中作为私有变量,并且模块导出由classifyinit函数组成的紧密接口.

这种设计解锁了一些可能性:

  • orchestrate 可以根据它所看到的内容导入和初始化不同的分类器模块 classifier_spec
  • 一个也可以传递一个Classifier类的实例classify,只要这个实例是可序列化的并且具有相同签名的classify方法


Dar*_*aut 5

如果你想使用分叉,我看不到使用全局的方法。但我也不明白为什么在这种情况下您必须对使用全局感到难过,因为您没有使用多线程等操作全局列表。

不过,可以解决您示例中的丑陋问题。你想直接传递classifier.classify,但该Classifier对象包含无法pickle的对象。

import os
import csv
import uuid
from threading import Lock
from multiprocessing import Pool
from weakref import WeakValueDictionary

class Classifier:

    def __init__(self, spec):
        self.lock = Lock()  # unpickleable
        self.spec = spec

    def classify(self, row):
        return f'classified by pid: {os.getpid()} with spec: {self.spec}', row
Run Code Online (Sandbox Code Playgroud)

我建议我们子类化Classifier并定义__getstate____setstate__启用酸洗。由于无论如何您都在使用分叉,因此它必须腌制的所有状态都是如何获取对分叉全局实例的引用的信息。然后,我们只需__dict__使用__dict__分叉实例(尚未经过酸洗减少)更新酸洗对象,您的实例就再次完成。

为了在没有额外样板的情况下实现这一点,子类化Classifier实例必须为自己生成一个名称并将其注册为全局变量。第一个引用将是弱引用,因此可以在用户期望时对实例进行垃圾收集。第二个引用是由用户在分配时创建的classifier = Classifier(classifier_spec)。这个,不一定是全球性的。

下面示例中的生成名称是在 standard-libuuid模块的帮助下生成的。uuid 被转换为字符串并编辑为有效的标识符(这不是必须的,但在交互模式下调试很方便)。

class SubClassifier(Classifier):

    def __init__(self, spec):
        super().__init__(spec)
        self.uuid = self._generate_uuid_string()
        self.pid = os.getpid()
        self._register_global()

    def __getstate__(self):
        """Define pickled content."""
        return {'uuid': self.uuid}

    def __setstate__(self, state):
        """Set state in child process."""
        self.__dict__ = state
        self.__dict__.update(self._get_instance().__dict__)

    def _get_instance(self):
        """Get reference to instance."""
        return globals()[self.uuid][self.uuid]

    @staticmethod
    def _generate_uuid_string():
        """Generate id as valid identifier."""
        # return 'uuid_' + '123' # for testing
        return 'uuid_' + str(uuid.uuid4()).replace('-', '_')

    def _register_global(self):
        """Register global reference to instance."""
        weakd = WeakValueDictionary({self.uuid: self})
        globals().update({self.uuid: weakd})

    def __del__(self):
        """Clean up globals when deleted in parent."""
        if os.getpid() == self.pid:
            globals().pop(self.uuid)
Run Code Online (Sandbox Code Playgroud)

令人高兴的是,样板文件完全消失了。您不必手动声明和删除全局变量,因为实例会在后台自行管理所有内容:

def classify(classifier_spec, data_file, n_workers):
    classifier = SubClassifier(classifier_spec)
    # assert globals()['uuid_123']['uuid_123'] # for testing
    with open(data_file, "rt") as fh, Pool(n_workers) as pool:
        rd = csv.DictReader(fh)
        yield from pool.imap_unordered(classifier.classify, rd)


if __name__ == '__main__':

    PATHFILE = 'data.csv'
    N_WORKERS = 4

    g = classify(classifier_spec='spec1', data_file=PATHFILE, n_workers=N_WORKERS)
    for record in g:
        print(record)

   # assert 'uuid_123' not in globals() # no reference left
Run Code Online (Sandbox Code Playgroud)