简单的numpy.apply_along_axis()并行化?

Eri*_*got 4 python arrays parallel-processing performance numpy

如何将函数对NumPy数组的元素的应用numpy.apply_along_axis()并行化,以利用多核?在通常情况下,要应用的函数的所有调用都是独立的,这似乎是很自然的事情。

在我的特定情况下,如果此事宜,应用的轴线是轴0: np.apply_along_axis(func, axis=0, arr=param_grid)np是NumPy的)。

我快速浏览了Numba,但似乎无法通过如下循环获得这种并行化:

@numba.jit(parallel=True)
result = np.empty(shape=params.shape[1:])
for index in np.ndindex(*result.shape)):  # All the indices of params[0,...]
    result[index] = func(params[(slice(None),) + index])  # Applying func along axis 0
Run Code Online (Sandbox Code Playgroud)

NumPy中显然还有一个编译选项可通过OpenMP进行并行化,但似乎无法通过MacPorts进行访问。

还可以考虑将数组切成几块并使用线程(以避免复制数据),然后在每块上并行应用函数。这比我要查找的要复杂(如果全局解释器锁释放不充分,则可能不起作用)。

能够以简单的方式使用多个内核来完成简单的可并行化的任务,例如将函数应用于数组的所有元素(这实际上是这里所需要的,函数func()只需一个一维数组),这非常好。参数)。

Eri*_*got 7

好的,我已经解决了:一种想法是使用标准multiprocessing模块,并将原始数组分成几个块(以限制与工作人员的通信开销)。可以相对轻松地完成此操作,如下所示:

import multiprocessing

import numpy as np

def parallel_apply_along_axis(func1d, axis, arr, *args, **kwargs):
    """
    Like numpy.apply_along_axis(), but takes advantage of multiple
    cores.
    """        
    # Effective axis where apply_along_axis() will be applied by each
    # worker (any non-zero axis number would work, so as to allow the use
    # of `np.array_split()`, which is only done on axis 0):
    effective_axis = 1 if axis == 0 else axis
    if effective_axis != axis:
        arr = arr.swapaxes(axis, effective_axis)

    # Chunks for the mapping (only a few chunks):
    chunks = [(func1d, effective_axis, sub_arr, args, kwargs)
              for sub_arr in np.array_split(arr, multiprocessing.cpu_count())]

    pool = multiprocessing.Pool()
    individual_results = pool.map(unpacking_apply_along_axis, chunks)
    # Freeing the workers:
    pool.close()
    pool.join()

    return np.concatenate(individual_results)
Run Code Online (Sandbox Code Playgroud)

unpacking_apply_along_axis()所应用的函数Pool.map()应按其应有的方式分开(以便子进程可以导入它),并且只是一个薄包装器,用于处理Pool.map()仅接受单个参数的事实:

def unpacking_apply_along_axis((func1d, axis, arr, args, kwargs)):
    """
    Like numpy.apply_along_axis(), but with arguments in a tuple
    instead.

    This function is useful with multiprocessing.Pool().map(): (1)
    map() only handles functions that take a single argument, and (2)
    this function can generally be imported from a module, as required
    by map().
    """
    return np.apply_along_axis(func1d, axis, arr, *args, **kwargs)
Run Code Online (Sandbox Code Playgroud)

(在Python 3中,该代码应写为

def unpacking_apply_along_axis(all_args):
    (func1d, axis, arr, args, kwargs) = all_args
Run Code Online (Sandbox Code Playgroud)

因为参数拆包已删除)。

在我的特殊情况下,这导致2个具有超线程功能的内核的速度提高了2倍。接近4倍的因子本来更好,但是仅用几行代码就可以达到很好的速度,并且对于具有更多内核(非常常见)的机器来说应该更好。也许有一种避免数据复制和使用共享内存的方法(也许通过multiprocessing模块本身)?

  • 感谢您上面的回答中的代码,这对我来说是有希望的,因为我也面临着一个令人尴尬的可并行化问题的问题,而`np.apply_along_axis()`到现在为止我还不知道。我要补充的另一个想法是dask可以帮助并行化,据推测是这样的,因为我还没有设法使其并行化,因为它需要使用dask数组而不是numpy数组,并且不是所有的numpy功能都可以从dask中获得。 API。参见https://dask.org/ (3认同)