Python多处理PicklingError:无法pickle <type'function'>

Ven*_*tta 217 python pickle multiprocessing python-multiprocessing

很抱歉,我无法用更简单的示例重现错误,而且我的代码太复杂而无法发布.如果我在IPython shell而不是常规Python中运行程序,那么事情就会很顺利.

我查看了之前关于这个问题的一些注意事项.它们都是由在类函数中定义的pool to call函数引起的.但对我来说情况并非如此.

Exception in thread Thread-3:
Traceback (most recent call last):
  File "/usr/lib64/python2.7/threading.py", line 552, in __bootstrap_inner
    self.run()
  File "/usr/lib64/python2.7/threading.py", line 505, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/usr/lib64/python2.7/multiprocessing/pool.py", line 313, in _handle_tasks
    put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed
Run Code Online (Sandbox Code Playgroud)

我将不胜感激任何帮助.

更新:我挑选的功能是在模块的顶层定义的.虽然它调用包含嵌套函数的函数.即f()要求g()调用h()具有嵌套函数i(),和我打电话pool.apply_async(f).f(),g(),h()都在顶层定义.我用这个模式尝试了更简单的例子,但它确实有效.

unu*_*tbu 271

这是一个可以腌制清单.特别是,如果函数在模块的顶层定义,则它们只能被选中.

这段代码:

import multiprocessing as mp

class Foo():
    @staticmethod
    def work(self):
        pass

if __name__ == '__main__':   
    pool = mp.Pool()
    foo = Foo()
    pool.apply_async(foo.work)
    pool.close()
    pool.join()
Run Code Online (Sandbox Code Playgroud)

产生的错误几乎与您发布的错误相同:

Exception in thread Thread-2:
Traceback (most recent call last):
  File "/usr/lib/python2.7/threading.py", line 552, in __bootstrap_inner
    self.run()
  File "/usr/lib/python2.7/threading.py", line 505, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 315, in _handle_tasks
    put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed
Run Code Online (Sandbox Code Playgroud)

问题是这些pool方法都使用a mp.SimpleQueue将任务传递给工作进程.经过的所有东西mp.SimpleQueue必须是可挑选的,并且foo.work不可挑选,因为它没有在模块的顶层定义.

它可以通过在顶层定义一个函数来修复,该函数调用foo.work():

def work(foo):
    foo.work()

pool.apply_async(work,args=(foo,))
Run Code Online (Sandbox Code Playgroud)

请注意,这foo是可选择的,因为它Foo是在顶层定义的并且 foo.__dict__是可选择的.

  • 要获得PicklingError,必须在Queue上放置一些不可选择的东西.它可能是函数或其参数.为了找到关于这个问题的更多信息,我建议复制你的程序,并开始削减它,使其更简单,更简单,每次重新运行程序以查看问题是否仍然存在.当它变得非常简单时,你要么自己发现了问题,要么会有一些你可以在这里发布的东西. (6认同)
  • _特别是,函数只有在模块的顶层定义时才可以picklable。_ 似乎将`functool.partial`应用于顶层函数的结果也是可以pickle的,即使它是在内部定义的另一个功能。 (6认同)
  • 仅晚了5年,但我刚刚遇到了这个问题。事实证明,“顶级”必须比平时更实际地使用:在我看来,函数定义必须在“初始化池”之前(即“ pool = Pool()”行[here]( http://stackoverflow.com/a/20549090/5067311)。我没想到,这可能是OP问题持续存在的原因。 (3认同)
  • 感谢您的回复。我更新了我的问题。不过我不认为这是原因 (2认同)
  • 另外:如果你在模块的顶层定义一个函数,但是它被装饰了,那么引用将是装饰器的输出,你仍然会得到这个错误. (2认同)

Mik*_*rns 84

我会用pathos.multiprocesssing,而不是multiprocessing. pathos.multiprocessing是一个multiprocessing使用的分支dill.dill可以在python中序列化几乎所有内容,因此您可以并行发送更多内容.该pathos叉也有直接与多个参数的函数的工作,因为你需要为类方法的能力.

>>> from pathos.multiprocessing import ProcessingPool as Pool
>>> p = Pool(4)
>>> class Test(object):
...   def plus(self, x, y): 
...     return x+y
... 
>>> t = Test()
>>> p.map(t.plus, x, y)
[4, 6, 8, 10]
>>> 
>>> class Foo(object):
...   @staticmethod
...   def work(self, x):
...     return x+1
... 
>>> f = Foo()
>>> p.apipe(f.work, f, 100)
<processing.pool.ApplyResult object at 0x10504f8d0>
>>> res = _
>>> res.get()
101
Run Code Online (Sandbox Code Playgroud)

获取pathos(如果您愿意dill),请访问:https: //github.com/uqfoundation

  • `pip install pathos`现在可以工作了,`pathos`是python 3兼容的. (9认同)
  • @DanielGoldfarb:“multiprocess”是“multiprocessing”的一个分支,其中“dill”在代码中的多个位置替换了“pickle”......但本质上就是这样。`pathos` 在 `multiprocess` 上提供了一些额外的 API 层,并且还有额外的后端。但是,这就是要点。 (4认同)
  • 干得好.对于其他任何人,我通过以下方式安装了两个库:`sudo pip install git + https:// github.com/uqfoundation/dill.git @ master`和`sudo pip install git + https://github.com/uqfoundation/pathos git的@ master`的 (3认同)
  • @AlexanderMcFarlane我不会用`sudo`安装python包(特别是来自github等外部源).相反,我建议运行:`pip install --user git + ...` (3认同)

ted*_*ivm 28

当这个问题出现时,multiprocessing一个简单的解决方案是从 切换PoolThreadPool。除了导入之外,无需更改代码即可完成此操作

from multiprocessing.pool import ThreadPool as Pool
Run Code Online (Sandbox Code Playgroud)

这是有效的,因为 ThreadPool 与主线程共享内存,而不是创建一个新进程——这意味着不需要酸洗。

这种方法的缺点是 python 不是处理线程的最佳语言——它使用称为全局解释器锁的东西来保持线程安全,这会减慢这里的某些用例。但是,如果您主要与其他系统交互(运行 HTTP 命令、与数据库通信、写入文件系统),那么您的代码可能不受 CPU 的约束,不会受到太大影响。事实上,我在编写 HTTP/HTTPS 基准测试时发现这里使用的线程模型具有较少的开销和延迟,因为创建新进程的开销远高于创建新线程的开销,否则程序只是在等待 HTTP回应。

因此,如果您在 python 用户空间中处理大量内容,这可能不是最好的方法。

  • 但是你只使用一个 CPU(至少对于使用 [GIL](https://wiki.python.org/moin/GlobalInterpreterLock) 的常规 Python 版本),这有点违背了目的。 (14认同)
  • 这实际上取决于目的是什么。全局解释器锁确实意味着一次只有一个实例可以运行 python 代码,但对于严重阻塞的操作(文件系统访问、下载大型或多个文件、运行外部代码),GIL 最终不是问题。在某些情况下,打开新进程(而不是线程)的开销超过了 GIL 开销。 (6认同)
  • 确实如此,谢谢。不过,您可能想在答案中添加一个警告。如今,处理能力的提高主要以更多而不是更强大的 CPU 核心的形式出现,从多核执行切换到单核执行是一个相当显着的副作用。 (6认同)
  • ...只是补充一点,在我的情况下(API 上的 HTTP 请求)它就像一个魅力(而 `pathos` 或其他方法却没有)...并且只需要更改一行 `import` 确实是对代码影响较小。非常感谢@RobertHafner (2认同)
  • 非常感谢您的回答!只需从“Pool”切换到“ThreadPool”即可对我起作用。 (2认同)

roc*_*ker 27

正如其他人所说multiprocessing,只能将Python对象转移到可以被腌制的工作进程.如果您无法按照unutbu的描述重新组织代码,则可以使用dills扩展的pickle/unpickling功能来传输数据(尤其是代码数据),如下所示.

此解决方案仅需要安装dill而不需要其他库pathos:

import os
from multiprocessing import Pool

import dill


def run_dill_encoded(payload):
    fun, args = dill.loads(payload)
    return fun(*args)


def apply_async(pool, fun, args):
    payload = dill.dumps((fun, args))
    return pool.apply_async(run_dill_encoded, (payload,))


if __name__ == "__main__":

    pool = Pool(processes=5)

    # asyn execution of lambda
    jobs = []
    for i in range(10):
        job = apply_async(pool, lambda a, b: (a, b, a * b), (i, i + 1))
        jobs.append(job)

    for job in jobs:
        print job.get()
    print

    # async execution of static method

    class O(object):

        @staticmethod
        def calc():
            return os.getpid()

    jobs = []
    for i in range(10):
        job = apply_async(pool, O.calc, ())
        jobs.append(job)

    for job in jobs:
        print job.get()
Run Code Online (Sandbox Code Playgroud)

  • 我是'dill`和`pathos`作者......虽然你是对的,但是在我的回答中使用`pathos`是不是更好,更清洁,更灵活?或许我有点偏颇...... (5认同)
  • 在撰写本文时我并没有意识到"悲情"的状态,并希望提出一个非常接近答案的解决方案.现在我已经看到了你的解决方案,我同意这是要走的路. (3认同)
  • 感谢发布,我使用这种方法进行无法腌制的dilling/undilling参数:http://stackoverflow.com/questions/27883574/cant-pickle-pyparsing-expression-with-setparseaction-method-needed-for-multi/27892382#27892382 (3认同)
  • @rocksportrocker。我正在阅读这个示例,但无法理解为什么存在显式的“for”循环。我通常会看到并行例程获取一个列表并返回一个没有循环的列表。 (2认同)

Eze*_*ick 15

我发现通过尝试在其上使用分析器,我还可以在完美的代码片段上生成完全错误输出.

请注意,这是在Windows上(分叉不太优雅).

我之前在跑步:

python -m profile -o output.pstats <script> 
Run Code Online (Sandbox Code Playgroud)

并发现删除分析删除了错误并放置分析恢复它.因为我知道过去常用的代码,所以也让我感到沮丧.我正在检查是否有什么东西更新了pool.py ...然后有一种下沉的感觉并消除了分析,就是这样.

在此处发布档案以防其他任何人遇到它.

  • 哇,谢谢你的提及!它在最后一个小时左右让我疯了; 我尝试了一切非常简单的例子 - 似乎没什么用.但我也有通过我的批处理文件运行的探查器:( (2认同)
  • 哦,真是太感谢你了。但这听起来确实很愚蠢,因为它是如此出乎意料。我认为应该在文档中提及。我所拥有的只是一个 import pdb 语句,而一个只有“pass”的简单顶级函数是无法“pickle”的。 (2认同)

Pen*_*esh 6

Can't pickle <type 'function'>: attribute lookup __builtin__.function failed
Run Code Online (Sandbox Code Playgroud)

如果模型对象中有任何内置函数传递给异步作业,也会出现此错误。

因此,请确保检查传递的模型对象没有内置函数。(在我们的例子中,我们使用模型内部的django-model-utilsFieldTracker()函数来跟踪某个字段)。这是相关 GitHub 问题的链接。