Xin*_*ang 7 python numpy scipy pandas dask
我有三个一维数组:
idxs
: 索引数据weights
: 中每个指标的权重 idxs
bins
:用于计算其中最小重量的 bin。这是我当前使用的方法idxs
来检查weights
在哪个 bin 中调用的数据,然后计算 bin 权重的最小值/最大值:
slices
显示每个垃圾箱idxs
元素所属的。slices
和weights
同时。weights
每个 bin(切片)中的最小值。import random
import numpy as np
# create example data
out_size = int(10)
bins = np.arange(3, out_size-3)
idxs = np.arange(0, out_size)
#random.shuffle(idxs)
# set duplicated slice manually for test
idxs[4] = idxs[3]
idxs[6] = idxs[7]
weights = idxs
# get which bin idxs belong to
slices = np.digitize(idxs, bins)
# get index and weights in bins
valid = (bins.max() >= idxs) & (idxs >= bins.min())
valid_slices = slices[valid]
valid_weights = weights[valid]
# sort slice and weights
sort_index = valid_slices.argsort()
valid_slices_sort = valid_slices[sort_index]
valid_weights_sort = valid_weights[sort_index]
# get index of each first unque slices
unique_slices, unique_index = np.unique(valid_slices_sort, return_index=True)
# calculate the minimum
res_sub = np.minimum.reduceat(valid_weights_sort, unique_index)
# save results
res = np.full((out_size), np.nan)
res[unique_slices-1] = res_sub
print(res)
Run Code Online (Sandbox Code Playgroud)
结果:
array([ 3., nan, 5., nan, nan, nan, nan, nan, nan, nan])
Run Code Online (Sandbox Code Playgroud)
如果我增加到out_size
1e7 并洗牌数据,速度(从 np.digitize 到最后)很慢:
13.5 s ± 136 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
Run Code Online (Sandbox Code Playgroud)
而且,这是每个部分的消耗时间:
np.digitize: 10.8 s ± 12.6 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
valid: 171 ms ± 3.78 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
argsort and slice: 2.02 s ± 33.7 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
unique: 9.9 ms ± 113 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
np.minimum.reduceat: 5.11 ms ± 52.7 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
Run Code Online (Sandbox Code Playgroud)
np.digitize()
花费大部分时间:10.8 秒。而且,接下来是argsort
:2.02 秒。
我还检查计算所消耗的时间mean
使用np.histogram
:
counts, _ = np.histogram(idxs, bins=out_size, range=(0, out_size))
sums, _ = np.histogram(idxs, bins=out_size, range=(0, out_size), weights = weights, density=False)
mean = sums / np.where(counts == 0, np.nan, counts)
33.2 s ± 3.47 s per loop (mean ± std. dev. of 7 runs, 1 loop each)
Run Code Online (Sandbox Code Playgroud)
这类似于我计算最小值的方法。
from scipy.stats import binned_statistic
statistics, _, _ = binned_statistic(idxs, weights, statistic='min', bins=bins)
print(statistics)
Run Code Online (Sandbox Code Playgroud)
结果有点不同,但对于较长的(1e7)混洗数据,速度要慢得多(x6):
array([ 3., nan, 5.])
1min 20s ± 6.93 s per loop (mean ± std. dev. of 7 runs, 1 loop each)
Run Code Online (Sandbox Code Playgroud)
我想找出一个更快的方法。如果该方法也适用于dask,那就太好了!
苏丹奥拉兹巴耶夫(SultanOrazbayev)表现出快速的方法;我会添加一个很酷的。
mask = bins[:, None] == idxs[None, :]
result = np.nanmin(np.where(mask, weights, np.nan), axis=-1)
# Note: may produce (expected) runtime warning if bin has no values
Run Code Online (Sandbox Code Playgroud)
当然,你也可以这样做np.nanmax
,np.nanmean
等等。
上面假设您的垃圾箱确实是单个值。如果它们是范围,则需要稍微多做一些工作来构建掩码
lower_mask = idxs[None, :] >= bins[:, None]
upper_mask = np.empty_like(lower_mask)
upper_mask[:-1, ...] = idxs[None, :] < bins[1:, None]
upper_mask[-1, ...] = False
mask = lower_mask & upper_mask
Run Code Online (Sandbox Code Playgroud)
此时您可以np.nanmin
像上面一样使用。
Ofcnp.where
和创建掩码的广播将使用各自的数据类型创建新的形状数组 (len(bins), len(idxs))。如果您不关心这一点,那么上面的内容可能就足够了。
如果这是一个问题(因为你对 RAM 感到紧张),那么我的第一个建议是购买更多 RAM。如果 - 由于某些愚蠢的原因(例如,金钱) - 这不起作用,您可以weights
通过在手动重新跨步视图上使用屏蔽数组来避免复制weights
import numpy.ma as ma
mask = ...
restrided_weights = np.lib.stride_tricks.as_strided(weights, shape=(bins.size, idxs.size), strides=(0, idxs.strides[0]))
masked = ma.masked_array(restrided_weights, mask=~mask, fill_value=np.nan, dtype=np.float64)
result = masked.min(axis=-1).filled(np.nan)
Run Code Online (Sandbox Code Playgroud)
weights
这可以避免上述运行时警告的副本。
如果您甚至没有足够的内存来构造mask
,那么您可以尝试分块处理数据。
最后我检查了一下,当使用手动跨步数组时,Dask 曾经有过有趣的行为。不过,这方面已经做了一些工作,因此您可能需要仔细检查是否已解决,在这种情况下,您可以愉快地并行化上述内容。
根据您对此答案和其他答案的进一步评论进行更新:
您可以分块进行此计算,以避免由于大量 bin(数量级为 1e4)而导致内存问题。将具体数字放入完整示例并添加进度条表明单个核心的运行时间<40 秒。
import numpy.ma as ma
from tqdm import trange
import numpy as np
import random
# create example data
out_size = int(1.5e5)
#bins = np.arange(3, out_size-3)
bins = np.arange(3, int(3.8e4-3), dtype=np.int64)
idxs = np.arange(0, out_size)
random.shuffle(idxs)
# set duplicated slice manually for test
idxs[4] = idxs[3]
idxs[6] = idxs[7]
weights = idxs
chunk_size = 100
# preallocate buffers to avoid array creation in main loop
extended_bins = np.empty(len(bins) + 1, dtype=bins.dtype)
extended_bins[:-1] = bins
extended_bins[-1] = np.iinfo(bins.dtype).max # last bin goes to infinity
mask_buffer = np.empty((chunk_size, len(idxs)), dtype=bool)
result = np.empty_like(bins, dtype=np.float64)
for low in trange(0, len(bins), chunk_size):
high = min(low + chunk_size, len(bins))
chunk_size = high - low
mask_buffer[:chunk_size, ...] = ~((bins[low:high, None] <= idxs[None, :]) & (extended_bins[low+1:high+1, None] > idxs[None, :]))
mask = mask_buffer[:chunk_size, ...]
restrided_weights = np.lib.stride_tricks.as_strided(weights, shape=mask.shape, strides=(0, idxs.strides[0]))
masked = ma.masked_array(restrided_weights, mask=mask, fill_value=np.nan, dtype=np.float64)
result[low:high] = masked.min(axis=-1).filled(np.nan)
Run Code Online (Sandbox Code Playgroud)
奖励:min
只有max
一个很酷的技巧可以使用:排序idxs
和weights
基于weights
(最小值升序,最大值降序)。这样,您可以立即查找最小/最大值,并可以完全避免屏蔽数组和自定义步幅。这依赖于一些没有很好记录的行为np.argmax
,它对布尔数组进行快速传递,并且不搜索整个数组。
它仅适用于这两种情况,对于更复杂的事情,您必须回退到上面的方法(平均),但对于这两种情况,它又削减了大约 70%,并且在单核时钟上运行的频率为 <12秒。
# fast min/max
from tqdm import trange
import numpy as np
# create example data
out_size = int(1.5e5)
#bins = np.arange(3, out_size-3)
bins = np.arange(3, int(3.8e4-3), dtype=np.int64)
idxs = np.arange(0, out_size)
random.shuffle(idxs)
# set duplicated slice manually for test
idxs[4] = idxs[3]
idxs[6] = idxs[7]
weights = idxs
order = np.argsort(weights)
weights_sorted = np.empty((len(weights) + 1), dtype=np.float64)
weights_sorted[:-1] = weights[order]
weights_sorted[-1] = np.nan
idxs_sorted = idxs[order]
extended_bins = np.empty(len(bins) + 1, dtype=bins.dtype)
extended_bins[:-1] = bins
extended_bins[-1] = np.iinfo(bins.dtype).max # last bin goes to infinity
# preallocate buffers to avoid array creation in main loop
chunk_size = 1000
mask_buffer = np.empty((chunk_size, len(idxs) + 1), dtype=bool)
mask_buffer[:, -1] = True
result = np.empty_like(bins, dtype=np.float64)
for low in trange(0, len(bins), chunk_size):
high = min(low + chunk_size, len(bins))
chunk_size = high - low
mask_buffer[:chunk_size, :-1] = (bins[low:high, None] <= idxs_sorted[None, :]) & (extended_bins[low+1:high+1, None] > idxs_sorted[None, :])
mask = mask_buffer[:chunk_size, ...]
weight_idx = np.argmax(mask, axis=-1)
result[low:high] = weights_sorted[weight_idx]
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
246 次 |
最近记录: |