对 Dask 数组的列应用函数

cha*_*u13 5 python dask dask-delayed dask-distributed dask-dataframe

将函数应用于 Dask 数组的每一列的最有效方法是什么?如下所述,我已经尝试了很多方法,但我仍然怀疑我对 Dask 的使用相当业余。

\n

我有一个相当宽且相当长的数组,大小约为 3,000,000 x 10,000。我想将 ecdf 函数应用于该数组的每一列。堆叠在一起的各个列结果应生成与输入数组具有相同维度的数组。

\n

考虑以下测试,让我知道哪种方法是理想的方法或者我可以如何改进。我知道,我可以只使用最快的,但我真的想最大限度地利用 Dask 的可能性。阵列也可以大数倍。与此同时,我的基准测试结果令我感到惊讶。也许我没有正确理解 Dask 背后的逻辑。

\n
import numpy as np\nimport dask\nimport dask.array as da\nfrom dask.distributed import Client, LocalCluster\nfrom statsmodels.distributions.empirical_distribution import ECDF\n\n### functions\ndef ecdf(x):\n    fn = ECDF(x)\n    return fn(x)\n\ndef ecdf_array(X):\n\n    res = np.zeros_like(X)\n    for i in range(X.shape[1]):\n        res[:,i] = ecdf(X[:,i])\n        \n    return res\n\n### set up scheduler / workers\nn_workers = 10\ncluster = LocalCluster(n_workers=n_workers, threads_per_worker=4)\nclient = Client(cluster)\n\n### create data set \nX = da.random.random((100000,100)) #dask\nXarr = X.compute() #numpy\n\n### traditional for loop\n%timeit -r 10 foo1 = ecdf_array(Xarr)\n\n### adjusting chunk size to 2d-array and map_blocks\nX = X.rechunk(chunks=(X.shape[0],np.ceil(X.shape[1]/n_workers)))\nXm = X.map_blocks(lambda x: ecdf_array(x),meta = np.array((), dtype=\'float\'))\n%timeit -r 10 foo2 = Xm.compute()\n\n### adjusting chunk size to column size and map_blocks\nX = X.rechunk(chunks=(X.shape[0],1))\nXm = X.map_blocks(lambda x: np.expand_dims(ecdf(np.squeeze(x)),1),meta = np.array((), dtype=\'float\'))\n%timeit -r 10 foo3 = Xm.compute()\n\n### map over columns by slicing\nXm = client.map(lambda i: ecdf(np.asarray(X[:,i])),range(X.shape[1]))\nXm = client.submit(lambda x: da.transpose(da.vstack(x)),Xm)\n%timeit -r 10 foo4 = Xm.result()\n\n### apply_along_axis\nXaa = da.apply_along_axis(lambda x: np.expand_dims(ecdf(x),1), 0, X, dtype=X.dtype, shape=X.shape)\n%timeit -r 10 foo5 = Xaa.compute()\n\n### lazy loop\nXl = []\n\nfor i in range(X.shape[1]):\n    Xl.append(dask.delayed(ecdf)(X[:,i]))\n    \nXl = dask.delayed(da.vstack)(Xl)\n%timeit -r 10 foo6 = Xl.compute()\n
Run Code Online (Sandbox Code Playgroud)\n

根据我的基准测试,“通过切片映射列”是最快的方法,其次是“将块大小调整为列大小和map_blocks”和非并行“apply_along_axis”。

\n
\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n \n\n\n\n\n\n\n\n\n
方法结果(10 个循环)
传统的 for 循环2.16 秒 \xc2\xb1 82.3 毫秒
将块大小调整为二维数组和map_blocks1.26 秒 \xc2\xb1 301 毫秒
将块大小调整为列大小和map_blocks926 毫秒 \xc2\xb1 31.9
通过切片映射列316 毫秒 \xc2\xb1 11.5 毫秒
沿轴应用1.01 秒 \xc2\xb1 18.7 毫秒
惰性循环1.4 秒 \xc2\xb1 352 毫秒
\n
\n

根据我对 Dask 背后的想法的理解,我预计“将块大小调整为 2d 数组和 map_blocks”方法是最快的。表现最好的两种方法似乎并不是很“Dasky”,同时非并行 apply_along_axis 排名第三。所有这些让我怀疑我做错了什么。

\n

Sul*_*yev 2

据我所知,您的代码看起来是正确的(请参阅下面的解释,了解为什么 的性能map over columns by slicing快得令人误解)。经过一些小的重构,“ dask-y”版本可能是:

from dask.array.random import random
from numpy import zeros
from statsmodels.distributions.empirical_distribution import ECDF

n_rows = 100_000
X = random((n_rows, 100), chunks=(n_rows, 1))

_ECDF = lambda x: ECDF(x.squeeze())(x)
meta = zeros((n_rows, 1), dtype="float")
foo0 = X.map_blocks(_ECDF, meta=meta)

# executing foo0.compute() should take about 0.8s
Run Code Online (Sandbox Code Playgroud)

请注意,dask 数组是使用适当的分块(每个块一列)来初始化的,而在当前代码中,执行计时将包括重新分块数组的时间。

就整体加速而言,单个计算很小(50 毫秒的规模),因此为了减少任务数量,可以将多个列的多个处理分成一个块。然而,由于迭代 numpy 数组的列,这需要权衡与减慢相关的问题。主要优点是减轻了调度程序的负担。根据最终数据集的规模和可用的计算资源,分块版本可能比非分块版本(即第一个片段)稍有优势:

from dask.array.random import random
from numpy import stack, zeros
from statsmodels.distributions.empirical_distribution import ECDF

n_rows = 100_000
n_cols = 100
chunk_size = (n_rows, 10)

X = random((n_rows, n_cols), chunks=chunk_size)

_ECDF = lambda x: ECDF(x.squeeze())(x)


def block_ECDF(x):
    return stack([_ECDF(column) for column in x.T], axis=1)


meta = zeros(chunk_size, dtype="float")
foo0 = X.map_blocks(block_ECDF, meta=meta)
# executing foo0.compute() should take about 0.8s
Run Code Online (Sandbox Code Playgroud)

请注意,基准测试中执行速度最快的是map over columns by slicing. 然而,这是一种误导,因为 python 在这里计时的只是计算结果的集合。大部分时间都花在计算上,因此这种方法的准确计时方法是在提交 future 时启动计时器,在收集结果时结束计时器。