将动态加载的函数提交到 ProcessPoolExecutor

bri*_*ium 5 python concurrent.futures

我想将动态加载的函数提交到 concurrent.futures.ProcessPoolExecutor. 这是一个例子。其中module.py包含该功能。

# Content of module.py

def func():
    return 1
Run Code Online (Sandbox Code Playgroud)

然后,剩下的就在file.py

# Content of file.py

from concurrent.futures import ProcessPoolExecutor
import multiprocessing
import importlib
from pathlib import Path
import inspect


def load_function_from_module(path):
    spec = importlib.util.spec_from_file_location(path.stem, str(path))
    mod = importlib.util.module_from_spec(spec)
    spec.loader.exec_module(mod)

    return mod


def func_top_level():
    return 2


if __name__ == '__main__':
    # Dynamically load function from other module.
    path = Path(__file__).parent / "module.py"
    func = dict(inspect.getmembers(load_function_from_module(path)))["func"]

    with ProcessPoolExecutor(2) as executor:
        future = executor.submit(func)
        future_ = executor.submit(func_top_level)

    # Here comes the exception.
    print(future.result())
Run Code Online (Sandbox Code Playgroud)

回溯是

Traceback (most recent call last):
_pickle.PicklingError: Can't pickle <function func at 0x7f5a548eb050>: it's not the same object as module.func
Run Code Online (Sandbox Code Playgroud)

解决方案1:func用顶级函数包装

放置def myfunc(): return func()在函数加载后并提交myfunc

这对于这个例子是有效的,但是一旦你将整个if __name__ ...块移动到它自己的main()函数中,myfunc()它就会再次变成本地函数,并且 hack 不起作用。由于问题发生在我的应用程序深处,因此这是不可能的。

尝试 2:替换picklecloudpickle

我个人最喜欢的解决方案是改变序列化对象的方式ProcessPoolExecutor。例如,cloudpickle可以序列化func.

虽然,这个答案表明可以注册自定义减速器,但以下 PR 和问题表明该功能不起作用,或者我只是无法替换picklecloudpickle.

非常感谢您的帮助。

cwi*_*olf 2

我使用一个简单的包装类来修补cloudpickleProcessPoolExecutor如下所示:

from concurrent.futures import ProcessPoolExecutor
import cloudpickle

def apply_cloudpickle(fn, /, *args, **kwargs):
    fn = cloudpickle.loads(fn)
    return fn(*args, **kwargs)

class CloudpickleProcessPoolExecutor(ProcessPoolExecutor):
    def submit(self, fn, /, *args, **kwargs):
        return super().submit(apply_cloudpickle, cloudpickle.dumps(fn), *args, **kwargs)
Run Code Online (Sandbox Code Playgroud)

然后,当您调用.map其他方法时,cloudpickle将在底层对函数对象进行酸洗。(请注意,参数仍将通过普通pickle管道运行。)

(相同的策略适用于dill等)