在joblib`Parallel`上下文中对`matlab`对象进行腌制时出错

cwi*_*olf 8 python matlab python-import python-3.x joblib

我正在从Python上下文中并行运行一些Matlab代码(我知道,但这就是要发生的事情),并且遇到了涉及的导入错误matlab.double。相同的代码在中可以正常工作multiprocessing.Pool,因此我很难弄清楚问题出在哪里。这是一个最小的再现测试用例。

import matlab
from multiprocessing import Pool
from joblib import Parallel, delayed

# A global object that I would like to be available in the parallel subroutine
x = matlab.double([[0.0]])

def f(i):
    print(i, x)

with Pool(4) as p:
    p.map(f, range(10))
    # This prints 1, [[0.0]]\n2, [[0.0]]\n... as expected

for _ in Parallel(4, backend='multiprocessing')(delayed(f)(i) for i in range(10)):
    pass
# This also prints 1, [[0.0]]\n2, [[0.0]]\n... as expected

# Now run with default `backend='loky'`
for _ in Parallel(4)(delayed(f)(i) for i in range(10)):
    pass
# ^ this crashes.
Run Code Online (Sandbox Code Playgroud)

因此,唯一有问题的是使用'loky'后端的。完整的回溯是:

exception calling callback for <Future at 0x7f63b5a57358 state=finished raised BrokenProcessPool>
joblib.externals.loky.process_executor._RemoteTraceback: 
'''
Traceback (most recent call last):
  File "~/miniconda3/envs/myenv/lib/python3.6/site-packages/joblib/externals/loky/process_executor.py", line 391, in _process_worker
    call_item = call_queue.get(block=True, timeout=timeout)
  File "~/miniconda3/envs/myenv/lib/python3.6/multiprocessing/queues.py", line 113, in get
    return _ForkingPickler.loads(res)
  File "~/miniconda3/envs/myenv/lib/python3.6/site-packages/matlab/mlarray.py", line 31, in <module>
    from _internal.mlarray_sequence import _MLArrayMetaClass
  File "~/miniconda3/envs/myenv/lib/python3.6/site-packages/matlab/_internal/mlarray_sequence.py", line 3, in <module>
    from _internal.mlarray_utils import _get_strides, _get_size, \
  File "~/miniconda3/envs/myenv/lib/python3.6/site-packages/matlab/_internal/mlarray_utils.py", line 4, in <module>
    import matlab
  File "~/miniconda3/envs/myenv/lib/python3.6/site-packages/matlab/__init__.py", line 24, in <module>
    from mlarray import double, single, uint8, int8, uint16, \
ImportError: cannot import name 'double'
'''

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "~/miniconda3/envs/myenv/lib/python3.6/site-packages/joblib/externals/loky/_base.py", line 625, in _invoke_callbacks
    callback(self)
  File "~/miniconda3/envs/myenv/lib/python3.6/site-packages/joblib/parallel.py", line 309, in __call__
    self.parallel.dispatch_next()
  File "~/miniconda3/envs/myenv/lib/python3.6/site-packages/joblib/parallel.py", line 731, in dispatch_next
    if not self.dispatch_one_batch(self._original_iterator):
  File "~/miniconda3/envs/myenv/lib/python3.6/site-packages/joblib/parallel.py", line 759, in dispatch_one_batch
    self._dispatch(tasks)
  File "~/miniconda3/envs/myenv/lib/python3.6/site-packages/joblib/parallel.py", line 716, in _dispatch
    job = self._backend.apply_async(batch, callback=cb)
  File "~/miniconda3/envs/myenv/lib/python3.6/site-packages/joblib/_parallel_backends.py", line 510, in apply_async
    future = self._workers.submit(SafeFunction(func))
  File "~/miniconda3/envs/myenv/lib/python3.6/site-packages/joblib/externals/loky/reusable_executor.py", line 151, in submit
    fn, *args, **kwargs)
  File "~/miniconda3/envs/myenv/lib/python3.6/site-packages/joblib/externals/loky/process_executor.py", line 1022, in submit
    raise self._flags.broken
joblib.externals.loky.process_executor.BrokenProcessPool: A task has failed to un-serialize. Please ensure that the arguments of the function are all picklable.
joblib.externals.loky.process_executor._RemoteTraceback: 
'''
Traceback (most recent call last):
  File "~/miniconda3/envs/myenv/lib/python3.6/site-packages/joblib/externals/loky/process_executor.py", line 391, in _process_worker
    call_item = call_queue.get(block=True, timeout=timeout)
  File "~/miniconda3/envs/myenv/lib/python3.6/multiprocessing/queues.py", line 113, in get
    return _ForkingPickler.loads(res)
  File "~/miniconda3/envs/myenv/lib/python3.6/site-packages/matlab/mlarray.py", line 31, in <module>
    from _internal.mlarray_sequence import _MLArrayMetaClass
  File "~/miniconda3/envs/myenv/lib/python3.6/site-packages/matlab/_internal/mlarray_sequence.py", line 3, in <module>
    from _internal.mlarray_utils import _get_strides, _get_size, \
  File "~/miniconda3/envs/myenv/lib/python3.6/site-packages/matlab/_internal/mlarray_utils.py", line 4, in <module>
    import matlab
  File "~/miniconda3/envs/myenv/lib/python3.6/site-packages/matlab/__init__.py", line 24, in <module>
    from mlarray import double, single, uint8, int8, uint16, \
ImportError: cannot import name 'double'
'''

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "test.py", line 20, in <module>
    for _ in Parallel(4)(delayed(f)(i) for i in range(10)):
  File "~/miniconda3/envs/myenv/lib/python3.6/site-packages/joblib/parallel.py", line 934, in __call__
    self.retrieve()
  File "~/miniconda3/envs/myenv/lib/python3.6/site-packages/joblib/parallel.py", line 833, in retrieve
    self._output.extend(job.get(timeout=self.timeout))
  File "~/miniconda3/envs/myenv/lib/python3.6/site-packages/joblib/_parallel_backends.py", line 521, in wrap_future_result
    return future.result(timeout=timeout)
  File "~/miniconda3/envs/myenv/lib/python3.6/concurrent/futures/_base.py", line 432, in result
    return self.__get_result()
  File "~/miniconda3/envs/myenv/lib/python3.6/concurrent/futures/_base.py", line 384, in __get_result
    raise self._exception
  File "~/miniconda3/envs/myenv/lib/python3.6/site-packages/joblib/externals/loky/_base.py", line 625, in _invoke_callbacks
    callback(self)
  File "~/miniconda3/envs/myenv/lib/python3.6/site-packages/joblib/parallel.py", line 309, in __call__
    self.parallel.dispatch_next()
  File "~/miniconda3/envs/myenv/lib/python3.6/site-packages/joblib/parallel.py", line 731, in dispatch_next
    if not self.dispatch_one_batch(self._original_iterator):
  File "~/miniconda3/envs/myenv/lib/python3.6/site-packages/joblib/parallel.py", line 759, in dispatch_one_batch
    self._dispatch(tasks)
  File "~/miniconda3/envs/myenv/lib/python3.6/site-packages/joblib/parallel.py", line 716, in _dispatch
    job = self._backend.apply_async(batch, callback=cb)
  File "~/miniconda3/envs/myenv/lib/python3.6/site-packages/joblib/_parallel_backends.py", line 510, in apply_async
    future = self._workers.submit(SafeFunction(func))
  File "~/miniconda3/envs/myenv/lib/python3.6/site-packages/joblib/externals/loky/reusable_executor.py", line 151, in submit
    fn, *args, **kwargs)
  File "~/miniconda3/envs/myenv/lib/python3.6/site-packages/joblib/externals/loky/process_executor.py", line 1022, in submit
    raise self._flags.broken
joblib.externals.loky.process_executor.BrokenProcessPool: A task has failed to un-serialize. Please ensure that the arguments of the function are all picklable.
Run Code Online (Sandbox Code Playgroud)

查看回溯,似乎根本原因是matlab在子进程中导入软件包的问题。

可能值得注意的是,如果我已经定义了x = np.array([[0.0]])(在导入之后numpy as np),那么这一切都可以正常运行。当然,主进程对任何matlab导入都没有问题,因此我不确定子进程为什么会这样做。

我不确定此错误是否与matlab软件包特别相关,还是与全局变量和cloudpickle或有关loky。在我的应用程序中,坚持使用会有所帮助loky,因此,我将不胜感激!

我还应注意,我使用的是Python的官方Matlab引擎:https : //www.mathworks.com/help/matlab/matlab-engine-for-python.html。我想这可能会使其他人很难尝试测试用例,因此我希望我可以使用以外的其他类型来重现此错误matlab.double,但是我还没有找到其他错误。

深入研究,我发现导入matlab软件包的过程比我预期的要循环得多,而且我推测这可能是问题的一部分?问题是,当import matlabloky的运行时_ForkingPickler,首先matlab/mlarray.py会导入一些文件,然后导入一些其他文件(其中一个包含)import matlab,这会导致matlab/__init__.py运行该文件,而该文件在内部具有from mlarray import double, single, uint8, ...导致崩溃的行。

难道这就是问题所在吗?如果是这样,为什么我可以在主进程中导入此模块,而不在loky后端导入?

gdl*_*lmx 6

该错误是由于子进程中全局对象的加载顺序不正确引起的。在加载全局变量时尚未导入的_ForkingPickler.loads(res) -> ... -> import matlab -> from mlarray import ... 回溯中可以清楚地看到 。matlabxcloudpickle

joblibwithloky似乎将模块视为普通的全局对象并将它们动态发送到子进程。joblib 不记录这些对象/模块的定义顺序。因此,它们在子进程中以随机顺序加载(初始化)。

一个简单的解决方法是手动 pickle matlab 对象并在将 matlab 导入到函数中后加载它。

import matlab
import pickle

px = pickle.dumps(matlab.double([[0.0]]))

def f(i):
    import matlab
    x=pickle.loads(px)
    print(i, x)
Run Code Online (Sandbox Code Playgroud)

当然,您也可以使用joblib.dumpsloads序列化对象。

使用初始化程序

感谢@Aaron的建议,您还可以在加载之前使用initializer用于loky)导入Matlab x

目前没有简单的 API 可以指定initializer。所以我写了一个简单的函数:

def with_initializer(self, f_init):
    # Overwrite initializer hook in the Loky ProcessPoolExecutor
    # https://github.com/tomMoral/loky/blob/f4739e123acb711781e46581d5ed31ed8201c7a9/loky/process_executor.py#L850
    hasattr(self._backend, '_workers') or self.__enter__()
    origin_init = self._backend._workers._initializer
    def new_init():
        origin_init()
        f_init()
    self._backend._workers._initializer = new_init if callable(origin_init) else f_init
    return self
Run Code Online (Sandbox Code Playgroud)

它有点 hacky,但与当前版本的 joblib 和 loky 配合得很好。然后你可以像这样使用它:

import matlab
from joblib import Parallel, delayed

x = matlab.double([[0.0]])

def f(i):
    print(i, x)

def _init_matlab():
    import matlab

with Parallel(4) as p:
    for _ in with_initializer(p, _init_matlab)(delayed(f)(i) for i in range(10)):
        pass
Run Code Online (Sandbox Code Playgroud)

我希望joblib的开发者将来能够initializer在构造函数中添加参数Parallel