在 numpy 行上并行化循环

Max*_*nke 5 python numpy dask

我需要将相同的函数应用于 numpy 数组中的每一行,并将结果再次存储在 numpy 数组中。

# states will contain results of function applied to a row in array
states = np.empty_like(array)

for i, ar in enumerate(array):
    states[i] = function(ar, *args)

# do some other stuff on states
Run Code Online (Sandbox Code Playgroud)

function对我的数据进行一些非平凡的过滤,并在条件为 True 和为 False 时返回一个数组。function可以是纯 python 或 cython 编译的。对行的过滤操作很复杂,并且可能依赖于行中先前的值,这意味着我不能以逐个元素的方式对整个数组进行操作

例如,有没有办法在 dask 中做这样的事情?

MRo*_*lin 5

达斯克解决方案

您可以通过按行对数组进行分块,调用map_blocks,然后计算结果来使用 dask.array

ar = ...
x = da.from_array(ar, chunks=(1, arr.shape[1]))
x.map_blocks(function, *args)
states = x.compute()
Run Code Online (Sandbox Code Playgroud)

默认情况下,这将使用线程,您可以通过以下方式使用进程

from dask.multiprocessing import get
states = x.compute(get=get)
Run Code Online (Sandbox Code Playgroud)

泳池解决方案

然而,对于像这样令人尴尬的并行计算,dask 可能有点矫枉过正,你可以使用线程池

from multiprocessing.pool import ThreadPool
pool = ThreadPool()

ar = ...
states = np.empty_like(array)

def f(i):
    states[i] = function(ar[i], *args)

pool.map(f, range(len(ar)))
Run Code Online (Sandbox Code Playgroud)

您可以切换到具有以下更改的流程

from multiprocessing import Pool
pool = Pool()
Run Code Online (Sandbox Code Playgroud)