对使用 concurrent.futures.ProcessPoolExecutor 动态创建的函数的限制

Ben*_*mer 3 python pickle multiprocessing

我正在尝试使用我在其他函数中动态创建的函数进行一些多处理。如果提供给 ProcessPoolExecutor 的函数是模块级的,我似乎可以运行这些:

def make_func(a):
    def dynamic_func(i):
        return i, i**2 + a
    return dynamic_func

f_dyns = [make_func(a) for a in range(10)]
def loopfunc(i):
    return f_dyns[i](i)

with concurrent.futures.ProcessPoolExecutor(3) as executor:
    for i,r in executor.map(loopfunc, range(10)):
        print(i,":",r)
Run Code Online (Sandbox Code Playgroud)

输出:

0 : 0
1 : 2
2 : 6
3 : 12
4 : 20
5 : 30
6 : 42
7 : 56
8 : 72
9 : 90
Run Code Online (Sandbox Code Playgroud)

但是,如果多处理是由类函数启动的,我就不能这样做:

class Test:
    def __init__(self,myfunc):
        self.f = myfunc

    def loopfunc(self,i):
        return self.f(i)

    def run(self):
        with concurrent.futures.ProcessPoolExecutor(3) as executor:
            for i,r in executor.map(self.loopfunc, range(10)):
                print(i,":",r)

o2 = Test(make_func(1))
o2.run()
Run Code Online (Sandbox Code Playgroud)

输出:

Traceback (most recent call last):
  File "/home/farmer/anaconda3/envs/general/lib/python3.6/multiprocessing/queues.py", line 234, in _feed
    obj = _ForkingPickler.dumps(obj)
  File "/home/farmer/anaconda3/envs/general/lib/python3.6/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
AttributeError: Can't pickle local object 'make_func.<locals>.dynamic_func'
Run Code Online (Sandbox Code Playgroud)

另一方面,如果我不使用动态生成的函数,我可以在类函数上运行多处理就好了。有没有办法解决这个问题?我尝试将动态生成的函数添加到“全局”字典中,但这似乎没有帮助:

def make_func_glob(a):
    def dynamic_func(i):
        return i, i**2 + a
    globals()['my_func_{0}'.format(a)] = dynamic_func

make_func_glob(1)
print("test:", my_func_1(3))
o3 = Test(my_func_1)
o3.run()
Run Code Online (Sandbox Code Playgroud)

输出:

test: (3, 10)
Traceback (most recent call last):
  File "/home/farmer/anaconda3/envs/general/lib/python3.6/multiprocessing/queues.py", line 234, in _feed
    obj = _ForkingPickler.dumps(obj)
  File "/home/farmer/anaconda3/envs/general/lib/python3.6/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
AttributeError: Can't pickle local object 'make_func_glob.<locals>.dynamic_func'
Run Code Online (Sandbox Code Playgroud)

所以即使我将它添加到“全局”字典中,python 仍然认为它是一个本地对象。像这种“全球”的想法会很好,我不需要任何花哨的东西。为了方便起见,我只是动态创建这些函数。我会很高兴它们成为全局对象。它们将始终由模块定义,只有一堆具有几乎相同的定义,因此以编程方式定义它们比手动将它们全部写出来更方便。所以我认为有可能以某种方式让python将它们识别为“真正的”函数,就像我通过'exec'定义它们一样。或者至少足够接近我可以在我的并行代码中使用它们。

Mic*_*mza 5

正如错误消息所暗示的那样,它更多地与酸洗而不是动态生成的函数有关。来自https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ProcessPoolExecutor

只能执行和返回可腌制的对象。

https://docs.python.org/3/library/pickle.html#what-c​​an-be-pickled-and-unpickled,可以腌制的函数种类:

在模块顶层定义的函数(使用 def,而不是 lambda)

这表明不能腌制其他类型的函数。从问题中的代码中跳出一个不符合此要求的函数:dynamic_func从...

def make_func(a):
    def dynamic_func(i):
        return i, i**2 + a
    return dynamic_func
Run Code Online (Sandbox Code Playgroud)

......你暗示这是问题......

所以我会认为有可能以某种方式让 python 将它们识别为“真正的”函数

你可以!您可以放在dynamic_func顶层,并使用partial而不是关闭...

from functools import partial
def dynamic_func(a, i):
    return i, i**2 + a

def make_func(a):
    return partial(dynamic_func, a)
Run Code Online (Sandbox Code Playgroud)

所以全...

import concurrent.futures
from functools import partial

def dynamic_func(a, i):
    return i, i**2 + a

def make_func(a):
    return partial(dynamic_func, a)

f_dyns = [make_func(a) for a in range(10)]
def loopfunc(i):
    return f_dyns[i](i)


class Test:
    def __init__(self, myfunc):
        self.f = myfunc

    def loopfunc(self, i):
        return self.f(i)

    def run(self):
        with concurrent.futures.ProcessPoolExecutor(3) as executor:
            for i,r in executor.map(self.loopfunc, range(10)):
                print(i,":",r)

o2 = Test(make_func(1))
o2.run()
Run Code Online (Sandbox Code Playgroud)

但是......为什么没有类的原始形式有效,我不知道。根据我的理解,它会试图腌制一个非顶级函数,所以我认为我的理解是有缺陷的。