如何在不处于顶层的情况下解决python多处理的酸洗错误?

Tim*_*Tim 7 python pickle multiprocessing python-multiprocessing

我已经多次研究过这个问题,但是没有找到一个可以在我的情况下工作的解决方法,或者我理解的解决方法,所以请耐心等待.

基本上,我有一个功能的层次组织,这阻止我在顶层进行多处理.不幸的是,我不相信我可以改变程序的布局 - 因为我需要在初始输入后创建的所有变量.

例如,说我有这个:

import multiprocessing

  def calculate(x):
    # here is where I would take this input x (and maybe a couple more inputs)
    # and build a larger library of variables that I use further down the line

    def domath(y):
      return x * y

    pool = multiprocessing.Pool(3)
    final= pool.map(domath, range(3))

calculate(2)
Run Code Online (Sandbox Code Playgroud)

这会产生以下错误:

Can't pickle <type 'function'>: attribute lookup __builtin__.function failed
Run Code Online (Sandbox Code Playgroud)

我在考虑全局,但我担心我必须定义太多,这可能会使我的程序减慢很多.是否有任何解决方法而无需重组整个计划?

Mik*_*rns 7

你可以使用pathos.multiprocessing,这是一个multiprocessing使用dill序列化器而不是pickle. dill可以在python中序列化几乎任何东西.然后,无需编辑您的代码.

>>> from pathos.multiprocessing import ProcessingPool as Pool
>>> 
>>> def calculate(x):
...   def domath(y):
...     return x*y
...   return Pool().map(domath, range(3))
... 
>>> calculate(2)
[0, 2, 4]
Run Code Online (Sandbox Code Playgroud)

你甚至可以坚持下去......因为大多数东西都是腌制的.不需要你必须用纯净烹饪的奇怪的非pythonic解决方案multiprocessing.

>>> class Foo(object):
...   def __init__(self, x):
...     self.x = x
...   def doit(self, y):
...     return ProcessingPool().map(self.squared, calculate(y+self.x))
...   def squared(self, z):
...     return z*z
... 
>>> def thing(obj, y):
...   return getattr(obj, 'doit')(y)
... 
>>> ProcessingPool().map(thing, ProcessingPool().map(Foo, range(3)), range(3))
[[0, 0, 0], [0, 4, 16], [0, 16, 64]]
Run Code Online (Sandbox Code Playgroud)

获取pathos此:https://github.com/uqfoundation

  • @deepelement:大多数任何 python 包,比如 `joblib`,都依赖于 `dill` 或 `cloudpickle` 进行高级序列化。 (2认同)

mot*_*oku 6

您遇到的问题实际上是一个功能。pickle 源实际上旨在防止此类行为,以防止执行恶意代码。在处理任何适用的安全实施时,请考虑这一点。

首先,我们有一些进口。

import marshal
import pickle
import types
Run Code Online (Sandbox Code Playgroud)

这里我们有一个函数,它接受一个函数作为参数,pickles 对象的各个部分,然后返回一个包含所有部分的元组:

def pack(fn):
    code = marshal.dumps(fn.__code__)
    name = pickle.dumps(fn.__name__)
    defs = pickle.dumps(fn.__defaults__)
    clos = pickle.dumps(fn.__closure__)
    return (code, name, defs, clos)
Run Code Online (Sandbox Code Playgroud)

接下来我们有一个函数,它接受我们转换后的函数的四个部分。它翻译这四个部分,然后创建然后从这些部分返回一个函数。您应该注意全局变量被重新引入这里,因为我们的流程不处理这些:

def unpack(code, name, defs, clos):
    code = marshal.loads(code)
    glob = globals()
    name = pickle.loads(name)
    defs = pickle.loads(defs)
    clos = pickle.loads(clos)
    return types.FunctionType(code, glob, name, defs, clos)
Run Code Online (Sandbox Code Playgroud)

这里我们有一个测试函数。注意我在函数的范围内放置了一个导入。全局变量不通过我们的酸洗过程处理:

def test_function(a, b):
    from random import randint
    return randint(a, b)
Run Code Online (Sandbox Code Playgroud)

最后,我们打包我们的测试对象并打印结果以确保一切正常:

packed = pack(test_function)
print((packed))
Run Code Online (Sandbox Code Playgroud)

最后,我们解包我们的函数,将它分配给一个变量,调用它,并打印它的输出:

unpacked = unpack(*packed)
print((unpacked(2, 20)))
Run Code Online (Sandbox Code Playgroud)

如果您有任何问题,请发表评论。