在另一个成对的bin数组中获取数据数组最小值的最快方法

Xin*_*ang 7 python numpy scipy pandas dask

我有三个一维数组:

  • idxs: 索引数据
  • weights: 中每个指标的权重 idxs
  • bins:用于计算其中最小重量的 bin。

这是我当前使用的方法idxs来检查weights在哪个 bin 中调用的数据,然后计算 bin 权重的最小值/最大值:

插图

  1. 获取slices显示每个垃圾箱idxs元素所属的。
  2. 排序slicesweights同时。
  3. 计算weights每个 bin(切片)中的最小值。

numpy 方法

import random
import numpy as np

# create example data
out_size = int(10)
bins = np.arange(3, out_size-3)
idxs = np.arange(0, out_size)
#random.shuffle(idxs)

# set duplicated slice manually for test
idxs[4] = idxs[3]
idxs[6] = idxs[7]

weights = idxs

# get which bin idxs belong to
slices = np.digitize(idxs, bins)

# get index and weights in bins
valid = (bins.max() >= idxs) & (idxs >= bins.min())
valid_slices = slices[valid]
valid_weights = weights[valid]

# sort slice and weights
sort_index = valid_slices.argsort()
valid_slices_sort = valid_slices[sort_index]
valid_weights_sort = valid_weights[sort_index]

# get index of each first unque slices
unique_slices, unique_index = np.unique(valid_slices_sort, return_index=True)
# calculate the minimum
res_sub = np.minimum.reduceat(valid_weights_sort, unique_index)

# save results
res = np.full((out_size), np.nan)
res[unique_slices-1] = res_sub

print(res)
Run Code Online (Sandbox Code Playgroud)

结果:

array([ 3., nan,  5., nan, nan, nan, nan, nan, nan, nan])
Run Code Online (Sandbox Code Playgroud)

如果我增加到out_size1e7 并洗牌数据,速度(从 np.digitize 到最后)很慢:

13.5 s ± 136 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
Run Code Online (Sandbox Code Playgroud)

而且,这是每个部分的消耗时间:

np.digitize: 10.8 s ± 12.6 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
valid: 171 ms ± 3.78 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
argsort and slice: 2.02 s ± 33.7 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
unique: 9.9 ms ± 113 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
np.minimum.reduceat: 5.11 ms ± 52.7 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
Run Code Online (Sandbox Code Playgroud)

np.digitize()花费大部分时间:10.8 秒。而且,接下来是argsort:2.02 秒。

我还检查计算所消耗的时间mean使用np.histogram

counts, _ = np.histogram(idxs, bins=out_size, range=(0, out_size))
sums, _ = np.histogram(idxs, bins=out_size, range=(0, out_size), weights = weights, density=False)
mean = sums / np.where(counts == 0, np.nan, counts)

33.2 s ± 3.47 s per loop (mean ± std. dev. of 7 runs, 1 loop each)
Run Code Online (Sandbox Code Playgroud)

这类似于我计算最小值的方法。

scipy方法

from scipy.stats import binned_statistic
statistics, _, _ = binned_statistic(idxs, weights, statistic='min', bins=bins)

print(statistics)
Run Code Online (Sandbox Code Playgroud)

结果有点不同,但对于较长的(1e7)混洗数据,速度要慢得多(x6):

array([ 3., nan,  5.])

1min 20s ± 6.93 s per loop (mean ± std. dev. of 7 runs, 1 loop each)
Run Code Online (Sandbox Code Playgroud)

概括

我想找出一个更快的方法。如果该方法也适用于dask,那就太好了

用户案例

这是我的真实数据 (1D) 的样子: 真实数据

Fir*_*ger 3

苏丹奥拉兹巴耶夫(SultanOrazbayev)表现出快速的方法;我会添加一个很酷的。

mask = bins[:, None] == idxs[None, :]
result = np.nanmin(np.where(mask, weights, np.nan), axis=-1)
# Note: may produce (expected) runtime warning if bin has no values
Run Code Online (Sandbox Code Playgroud)

当然,你也可以这样做np.nanmaxnp.nanmean等等。

上面假设您的垃圾箱确实是单个值。如果它们是范围,则需要稍微多做一些工作来构建掩码

lower_mask = idxs[None, :] >= bins[:, None]
upper_mask = np.empty_like(lower_mask)
upper_mask[:-1, ...] = idxs[None, :] < bins[1:, None]
upper_mask[-1, ...] = False

mask = lower_mask & upper_mask
Run Code Online (Sandbox Code Playgroud)

此时您可以np.nanmin像上面一样使用。


Ofcnp.where和创建掩码的广播将使用各自的数据类型创建新的形状数组 (len(bins), len(idxs))。如果您不关心这一点,那么上面的内容可能就足够了。

如果这是一个问题(因为你对 RAM 感到紧张),那么我的第一个建议是购买更多 RAM。如果 - 由于某些愚蠢的原因(例如,金钱) - 这不起作用,您可以weights通过在手动重新跨步视图上使用屏蔽数组来避免复制weights

import numpy.ma as ma

mask = ...

restrided_weights = np.lib.stride_tricks.as_strided(weights, shape=(bins.size, idxs.size), strides=(0, idxs.strides[0]))
masked = ma.masked_array(restrided_weights, mask=~mask, fill_value=np.nan, dtype=np.float64)
result = masked.min(axis=-1).filled(np.nan)
Run Code Online (Sandbox Code Playgroud)

weights这可以避免上述运行时警告的副本。

如果您甚至没有足够的内存来构造mask,那么您可以尝试分块处理数据。

最后我检查了一下,当使用手动跨步数组时,Dask 曾经有过有趣的行为。不过,这方面已经做了一些工作,因此您可能需要仔细检查是否已解决,在这种情况下,您可以愉快地并行化上述内容。


根据您对此答案和其他答案的进一步评论进行更新:

您可以分块进行此计算,以避免由于大量 bin(数量级为 1e4)而导致内存问题。将具体数字放入完整示例并添加进度条表明单个核心的运行时间<40 秒。

import numpy.ma as ma
from tqdm import trange
import numpy as np
import random

# create example data
out_size = int(1.5e5)
#bins = np.arange(3, out_size-3)
bins = np.arange(3, int(3.8e4-3), dtype=np.int64)
idxs = np.arange(0, out_size)
random.shuffle(idxs)

# set duplicated slice manually for test
idxs[4] = idxs[3]
idxs[6] = idxs[7]

weights = idxs

chunk_size = 100

# preallocate buffers to avoid array creation in main loop
extended_bins = np.empty(len(bins) + 1, dtype=bins.dtype)
extended_bins[:-1] = bins
extended_bins[-1] = np.iinfo(bins.dtype).max # last bin goes to infinity
mask_buffer = np.empty((chunk_size, len(idxs)), dtype=bool)


result = np.empty_like(bins, dtype=np.float64)

for low in trange(0, len(bins), chunk_size):
    high = min(low + chunk_size, len(bins))
    chunk_size = high - low
    mask_buffer[:chunk_size, ...] = ~((bins[low:high, None] <= idxs[None, :]) & (extended_bins[low+1:high+1, None] > idxs[None, :]))
    mask = mask_buffer[:chunk_size, ...]
    restrided_weights = np.lib.stride_tricks.as_strided(weights, shape=mask.shape, strides=(0, idxs.strides[0]))
    masked = ma.masked_array(restrided_weights, mask=mask, fill_value=np.nan, dtype=np.float64)
    result[low:high] = masked.min(axis=-1).filled(np.nan)
Run Code Online (Sandbox Code Playgroud)

奖励min只有max 一个很酷的技巧可以使用:排序idxsweights基于weights(最小值升序,最大值降序)。这样,您可以立即查找最小/最大值,并可以完全避免屏蔽数组和自定义步幅。这依赖于一些没有很好记录的行为np.argmax,它对布尔数组进行快速传递,并且不搜索整个数组。

它仅适用于这两种情况,对于更复杂的事情,您必须回退到上面的方法(平均),但对于这两种情况,它又削减了大约 70%,并且在单核时钟上运行的频率为 <12秒。

# fast min/max
from tqdm import trange
import numpy as np

# create example data
out_size = int(1.5e5)
#bins = np.arange(3, out_size-3)
bins = np.arange(3, int(3.8e4-3), dtype=np.int64)
idxs = np.arange(0, out_size)
random.shuffle(idxs)

# set duplicated slice manually for test
idxs[4] = idxs[3]
idxs[6] = idxs[7]

weights = idxs


order = np.argsort(weights)
weights_sorted = np.empty((len(weights) + 1), dtype=np.float64)
weights_sorted[:-1] = weights[order]
weights_sorted[-1] = np.nan

idxs_sorted = idxs[order]

extended_bins = np.empty(len(bins) + 1, dtype=bins.dtype)
extended_bins[:-1] = bins
extended_bins[-1] = np.iinfo(bins.dtype).max # last bin goes to infinity

# preallocate buffers to avoid array creation in main loop
chunk_size = 1000
mask_buffer = np.empty((chunk_size, len(idxs) + 1), dtype=bool)
mask_buffer[:, -1] = True
result = np.empty_like(bins, dtype=np.float64)

for low in trange(0, len(bins), chunk_size):
    high = min(low + chunk_size, len(bins))
    chunk_size = high - low
    mask_buffer[:chunk_size, :-1] = (bins[low:high, None] <= idxs_sorted[None, :]) & (extended_bins[low+1:high+1, None] > idxs_sorted[None, :])
    mask = mask_buffer[:chunk_size, ...]
    weight_idx = np.argmax(mask, axis=-1)

    result[low:high] = weights_sorted[weight_idx]
Run Code Online (Sandbox Code Playgroud)