cha*_*u13 5 python dask dask-delayed dask-distributed dask-dataframe
将函数应用于 Dask 数组的每一列的最有效方法是什么?如下所述,我已经尝试了很多方法,但我仍然怀疑我对 Dask 的使用相当业余。
\n我有一个相当宽且相当长的数组,大小约为 3,000,000 x 10,000。我想将 ecdf 函数应用于该数组的每一列。堆叠在一起的各个列结果应生成与输入数组具有相同维度的数组。
\n考虑以下测试,让我知道哪种方法是理想的方法或者我可以如何改进。我知道,我可以只使用最快的,但我真的想最大限度地利用 Dask 的可能性。阵列也可以大数倍。与此同时,我的基准测试结果令我感到惊讶。也许我没有正确理解 Dask 背后的逻辑。
\nimport numpy as np\nimport dask\nimport dask.array as da\nfrom dask.distributed import Client, LocalCluster\nfrom statsmodels.distributions.empirical_distribution import ECDF\n\n### functions\ndef ecdf(x):\n fn = ECDF(x)\n return fn(x)\n\ndef ecdf_array(X):\n\n res = np.zeros_like(X)\n for i in range(X.shape[1]):\n res[:,i] = ecdf(X[:,i])\n \n return res\n\n### set up scheduler / workers\nn_workers = 10\ncluster = LocalCluster(n_workers=n_workers, threads_per_worker=4)\nclient = Client(cluster)\n\n### create data set \nX = da.random.random((100000,100)) #dask\nXarr = X.compute() #numpy\n\n### traditional for loop\n%timeit -r 10 foo1 = ecdf_array(Xarr)\n\n### adjusting chunk size to 2d-array and map_blocks\nX = X.rechunk(chunks=(X.shape[0],np.ceil(X.shape[1]/n_workers)))\nXm = X.map_blocks(lambda x: ecdf_array(x),meta = np.array((), dtype=\'float\'))\n%timeit -r 10 foo2 = Xm.compute()\n\n### adjusting chunk size to column size and map_blocks\nX = X.rechunk(chunks=(X.shape[0],1))\nXm = X.map_blocks(lambda x: np.expand_dims(ecdf(np.squeeze(x)),1),meta = np.array((), dtype=\'float\'))\n%timeit -r 10 foo3 = Xm.compute()\n\n### map over columns by slicing\nXm = client.map(lambda i: ecdf(np.asarray(X[:,i])),range(X.shape[1]))\nXm = client.submit(lambda x: da.transpose(da.vstack(x)),Xm)\n%timeit -r 10 foo4 = Xm.result()\n\n### apply_along_axis\nXaa = da.apply_along_axis(lambda x: np.expand_dims(ecdf(x),1), 0, X, dtype=X.dtype, shape=X.shape)\n%timeit -r 10 foo5 = Xaa.compute()\n\n### lazy loop\nXl = []\n\nfor i in range(X.shape[1]):\n Xl.append(dask.delayed(ecdf)(X[:,i]))\n \nXl = dask.delayed(da.vstack)(Xl)\n%timeit -r 10 foo6 = Xl.compute()\nRun Code Online (Sandbox Code Playgroud)\n根据我的基准测试,“通过切片映射列”是最快的方法,其次是“将块大小调整为列大小和map_blocks”和非并行“apply_along_axis”。
\n| 方法 | 结果(10 个循环) |
|---|---|
| 传统的 for 循环 | 2.16 秒 \xc2\xb1 82.3 毫秒 |
| 将块大小调整为二维数组和map_blocks | 1.26 秒 \xc2\xb1 301 毫秒 |
| 将块大小调整为列大小和map_blocks | 926 毫秒 \xc2\xb1 31.9 |
| 通过切片映射列 | 316 毫秒 \xc2\xb1 11.5 毫秒 |
| 沿轴应用 | 1.01 秒 \xc2\xb1 18.7 毫秒 |
| 惰性循环 | 1.4 秒 \xc2\xb1 352 毫秒 |
根据我对 Dask 背后的想法的理解,我预计“将块大小调整为 2d 数组和 map_blocks”方法是最快的。表现最好的两种方法似乎并不是很“Dasky”,同时非并行 apply_along_axis 排名第三。所有这些让我怀疑我做错了什么。
\n据我所知,您的代码看起来是正确的(请参阅下面的解释,了解为什么 的性能map over columns by slicing快得令人误解)。经过一些小的重构,“ dask-y”版本可能是:
from dask.array.random import random
from numpy import zeros
from statsmodels.distributions.empirical_distribution import ECDF
n_rows = 100_000
X = random((n_rows, 100), chunks=(n_rows, 1))
_ECDF = lambda x: ECDF(x.squeeze())(x)
meta = zeros((n_rows, 1), dtype="float")
foo0 = X.map_blocks(_ECDF, meta=meta)
# executing foo0.compute() should take about 0.8s
Run Code Online (Sandbox Code Playgroud)
请注意,dask 数组是使用适当的分块(每个块一列)来初始化的,而在当前代码中,执行计时将包括重新分块数组的时间。
就整体加速而言,单个计算很小(50 毫秒的规模),因此为了减少任务数量,可以将多个列的多个处理分成一个块。然而,由于迭代 numpy 数组的列,这需要权衡与减慢相关的问题。主要优点是减轻了调度程序的负担。根据最终数据集的规模和可用的计算资源,分块版本可能比非分块版本(即第一个片段)稍有优势:
from dask.array.random import random
from numpy import stack, zeros
from statsmodels.distributions.empirical_distribution import ECDF
n_rows = 100_000
n_cols = 100
chunk_size = (n_rows, 10)
X = random((n_rows, n_cols), chunks=chunk_size)
_ECDF = lambda x: ECDF(x.squeeze())(x)
def block_ECDF(x):
return stack([_ECDF(column) for column in x.T], axis=1)
meta = zeros(chunk_size, dtype="float")
foo0 = X.map_blocks(block_ECDF, meta=meta)
# executing foo0.compute() should take about 0.8s
Run Code Online (Sandbox Code Playgroud)
请注意,基准测试中执行速度最快的是map over columns by slicing. 然而,这是一种误导,因为 python 在这里计时的只是计算结果的集合。大部分时间都花在计算上,因此这种方法的准确计时方法是在提交 future 时启动计时器,在收集结果时结束计时器。
| 归档时间: |
|
| 查看次数: |
682 次 |
| 最近记录: |