使用多处理Pool.map()时无法pickle <type'instancemethod'>

ven*_*lin 206 python multithreading pool pickle multiprocessing

我想使用multiprocessingPool.map()功能,同时划分出工作.当我使用以下代码时,它工作正常:

import multiprocessing

def f(x):
    return x*x

def go():
    pool = multiprocessing.Pool(processes=4)        
    print pool.map(f, range(10))


if __name__== '__main__' :
    go()
Run Code Online (Sandbox Code Playgroud)

但是,当我在面向对象的方法中使用它时,它不起作用.它给出的错误信息是:

PicklingError: Can't pickle <type 'instancemethod'>: attribute lookup
__builtin__.instancemethod failed
Run Code Online (Sandbox Code Playgroud)

当以下是我的主程序时会发生这种情况:

import someClass

if __name__== '__main__' :
    sc = someClass.someClass()
    sc.go()
Run Code Online (Sandbox Code Playgroud)

以下是我的someClass课程:

import multiprocessing

class someClass(object):
    def __init__(self):
        pass

    def f(self, x):
        return x*x

    def go(self):
        pool = multiprocessing.Pool(processes=4)       
        print pool.map(self.f, range(10))
Run Code Online (Sandbox Code Playgroud)

任何人都知道问题可能是什么,或者一个简单的方法呢?

Ale*_*lli 117

问题是多处理必须腌制东西以在进程之间吊索它们,并且绑定方法不可挑选.解决方法(无论你认为它是否"简单";-)是将基础结构添加到程序中以允许这些方法被pickle,并使用copy_reg标准库方法注册它.

例如,Steven Bethard 对此线程的贡献(在线程结束时)显示了一种完全可行的方法,允许方法pickle/unpickling via copy_reg.

  • [对于超级超级懒人](http://stackoverflow.com/a/7309686/247542),看到唯一的答案,困扰发布实际的非破坏代码... (13认同)
  • [对于懒惰](http://bytes.com/topic/python/answers/552476-why-cant-you-pickle-instancemethods#edit2155350) (7认同)
  • 解决/规避酸洗问题的另一种方法是使用莳萝,请参阅我的答案 http://stackoverflow.com/questions/8804830/python-multiprocessing-pickling-error/24673524#24673524 (2认同)

Mik*_*rns 70

所有这些解决方案都很难看,因为除非您跳出标准库,否则多处理和酸洗会被破坏和限制.

如果使用multiprocessing被调用的fork pathos.multiprocesssing,则可以直接在多处理map函数中使用类和类方法.这是因为dill使用而不是picklecPickle,并且dill可以序列化python中的几乎任何东西.

pathos.multiprocessing还提供了一个异步映射函数......它可以map使用多个参数(例如map(math.pow, [1,2,3], [4,5,6]))

请参阅: 多处理和莳萝可以一起做什么?

和:http: //matthewrocklin.com/blog/work/2013/12/05/Parallelism-and-Serialization/

>>> import pathos.pools as pp
>>> p = pp.ProcessPool(4)
>>> 
>>> def add(x,y):
...   return x+y
... 
>>> x = [0,1,2,3]
>>> y = [4,5,6,7]
>>> 
>>> p.map(add, x, y)
[4, 6, 8, 10]
>>> 
>>> class Test(object):
...   def plus(self, x, y): 
...     return x+y
... 
>>> t = Test()
>>> 
>>> p.map(Test.plus, [t]*4, x, y)
[4, 6, 8, 10]
>>> 
>>> p.map(t.plus, x, y)
[4, 6, 8, 10]
Run Code Online (Sandbox Code Playgroud)

而且只是为了明确,你可以完全想要你想做的事情,如果你愿意的话,你可以从翻译中做到.

>>> import pathos.pools as pp
>>> class someClass(object):
...   def __init__(self):
...     pass
...   def f(self, x):
...     return x*x
...   def go(self):
...     pool = pp.ProcessPool(4)
...     print pool.map(self.f, range(10))
... 
>>> sc = someClass()
>>> sc.go()
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
>>> 
Run Code Online (Sandbox Code Playgroud)

获取代码:https: //github.com/uqfoundation/pathos

  • 我是`pathos`的作者.您所指的版本已有几年历史了.试试github上的版本,您可以使用`pathos.pp`或https://github.com/uqfoundation/ppft. (9认同)
  • 你可以根据pathos.pp更新这个答案,因为pathos.multiprocessing不再存在了吗? (3认同)
  • 首先是`pip install setuptools`,然后是`pip install git + https:// github.com/uqfoundation/pathos.git @ master`.这将获得适当的依赖项.一个新版本已经准备就绪......现在,`pathos`中的几乎所有内容都在Windows上运行,并且与`3.x`兼容. (3认同)

dor*_*vak 33

您还可以__call__()在您的内部定义一个方法someClass(),该方法调用someClass.go()然后将一个实例传递someClass()给池.这个对象是pickleable,它工作正常(对我来说)......

  • 要记住的另一个细节是它只是*对象(类实例)被腌制而不是类本身.因此,如果您已从其默认值更改任何类属性,则这些更改将不会传播到不同的进程.解决方法是确保将函数所需的所有内容存储为实例属性. (6认同)
  • 这比Alex Martelli提出的技术要容易得多,但是您只能在每个类中向您的多处理池发送一个方法. (3认同)
  • @dorvak你能用`__call __()`来展示一个简单的例子吗?我认为你的答案可能更清晰 - 我正在努力理解这个错误,我第一次来看电话.顺便说一句,这个答案也有助于澄清多处理的作用:[http://stackoverflow.com/a/20789937/305883] (2认同)

Eri*_* H. 20

虽然Steven Bethard的解决方案存在一些局限性:

当您将类方法注册为函数时,每次完成方法处理时,都会令人惊讶地调用类的析构函数.因此,如果您的类的一个实例调用其方法的n倍,则成员可能会在两次运行之间消失,并且您可能会收到一条消息malloc: *** error for object 0x...: pointer being freed was not allocated(例如,打开的成员文件)或pure virtual method called, terminate called without an active exception(这比我使用的成员对象的生命周期短于我的想法).当处理大于池大小的n时,我得到了这个.这是一个简短的例子:

from multiprocessing import Pool, cpu_count
from multiprocessing.pool import ApplyResult

# --------- see Stenven's solution above -------------
from copy_reg import pickle
from types import MethodType

def _pickle_method(method):
    func_name = method.im_func.__name__
    obj = method.im_self
    cls = method.im_class
    return _unpickle_method, (func_name, obj, cls)

def _unpickle_method(func_name, obj, cls):
    for cls in cls.mro():
        try:
            func = cls.__dict__[func_name]
        except KeyError:
            pass
        else:
            break
    return func.__get__(obj, cls)


class Myclass(object):

    def __init__(self, nobj, workers=cpu_count()):

        print "Constructor ..."
        # multi-processing
        pool = Pool(processes=workers)
        async_results = [ pool.apply_async(self.process_obj, (i,)) for i in range(nobj) ]
        pool.close()
        # waiting for all results
        map(ApplyResult.wait, async_results)
        lst_results=[r.get() for r in async_results]
        print lst_results

    def __del__(self):
        print "... Destructor"

    def process_obj(self, index):
        print "object %d" % index
        return "results"

pickle(MethodType, _pickle_method, _unpickle_method)
Myclass(nobj=8, workers=3)
# problem !!! the destructor is called nobj times (instead of once)
Run Code Online (Sandbox Code Playgroud)

输出:

Constructor ...
object 0
object 1
object 2
... Destructor
object 3
... Destructor
object 4
... Destructor
object 5
... Destructor
object 6
... Destructor
object 7
... Destructor
... Destructor
... Destructor
['results', 'results', 'results', 'results', 'results', 'results', 'results', 'results']
... Destructor
Run Code Online (Sandbox Code Playgroud)

__call__方法不是等价的,因为从结果中读取了[None,...]:

from multiprocessing import Pool, cpu_count
from multiprocessing.pool import ApplyResult

class Myclass(object):

    def __init__(self, nobj, workers=cpu_count()):

        print "Constructor ..."
        # multiprocessing
        pool = Pool(processes=workers)
        async_results = [ pool.apply_async(self, (i,)) for i in range(nobj) ]
        pool.close()
        # waiting for all results
        map(ApplyResult.wait, async_results)
        lst_results=[r.get() for r in async_results]
        print lst_results

    def __call__(self, i):
        self.process_obj(i)

    def __del__(self):
        print "... Destructor"

    def process_obj(self, i):
        print "obj %d" % i
        return "result"

Myclass(nobj=8, workers=3)
# problem !!! the destructor is called nobj times (instead of once), 
# **and** results are empty !
Run Code Online (Sandbox Code Playgroud)

所以这两种方法都不令人满意......

  • 你得到`None`,因为`__call__`的定义缺少`return`:它应该是`return self.process_obj(i)`. (7认同)

tor*_*rek 14

您可以使用另一个快捷方式,但根据类实例中的内容,它可能效率低下.

正如大家所说的那样,问题在于multiprocessing代码必须腌制它发送给它已启动的子进程的东西,并且pickler不会执行实例方法.

但是,您可以将实际的类实例以及要调用的函数的名称发送到普通函数,然后用于getattr调用实例方法,从而在Pool子进程中创建绑定方法,而不是发送实例方法.这与定义__call__方法类似,只是您可以调用多个成员函数.

从他的答案中窃取@ EricH.的代码并对其进行注释(我重新输入了所有的名称更改,因此出于某种原因,这似乎比剪切和粘贴更容易:-))用于说明所有魔法:

import multiprocessing
import os

def call_it(instance, name, args=(), kwargs=None):
    "indirect caller for instance methods and multiprocessing"
    if kwargs is None:
        kwargs = {}
    return getattr(instance, name)(*args, **kwargs)

class Klass(object):
    def __init__(self, nobj, workers=multiprocessing.cpu_count()):
        print "Constructor (in pid=%d)..." % os.getpid()
        self.count = 1
        pool = multiprocessing.Pool(processes = workers)
        async_results = [pool.apply_async(call_it,
            args = (self, 'process_obj', (i,))) for i in range(nobj)]
        pool.close()
        map(multiprocessing.pool.ApplyResult.wait, async_results)
        lst_results = [r.get() for r in async_results]
        print lst_results

    def __del__(self):
        self.count -= 1
        print "... Destructor (in pid=%d) count=%d" % (os.getpid(), self.count)

    def process_obj(self, index):
        print "object %d" % index
        return "results"

Klass(nobj=8, workers=3)
Run Code Online (Sandbox Code Playgroud)

输出显示,实际上,构造函数被调用一次(在原始pid中)并且析构函数被调用9次(每个副本执行一次=每个pool-worker-process需要2或3次,在原始版本中加一次处理).这通常是正常的,因为在这种情况下,由于默认选择器生成整个实例的副本并且(半)秘密地重新填充它 - 在这种情况下,执行:

obj = object.__new__(Klass)
obj.__dict__.update({'count':1})
Run Code Online (Sandbox Code Playgroud)

- 这就是为什么即使析构函数在三个工作进程中被调用八次,它每次从1倒数到0 - 但当然你仍然可以通过这种方式遇到麻烦.如有必要,您可以提供自己的__setstate__:

    def __setstate__(self, adict):
        self.count = adict['count']
Run Code Online (Sandbox Code Playgroud)

例如,在这种情况下.


par*_*ohn 11

您还可以__call__()在您的内部定义一个方法someClass(),该方法调用someClass.go()然后将一个实例传递someClass()给池.这个对象是pickleable,它工作正常(对我来说)......

class someClass(object):
   def __init__(self):
       pass
   def f(self, x):
       return x*x

   def go(self):
      p = Pool(4)
      sc = p.map(self, range(4))
      print sc

   def __call__(self, x):   
     return self.f(x)

sc = someClass()
sc.go()
Run Code Online (Sandbox Code Playgroud)