Tho*_*wne 8 python numpy bigdata
假设我有一个长度为30的数组,其中包含4个错误值.我想为那些不好的值创建一个掩码,但由于我将使用滚动窗口函数,我还希望在每个坏值被标记为坏之后有一定数量的后续索引.在下面,n = 3:
我想尽可能高效地执行此操作,因为此例程将在包含数十亿个数据点的大型数据系列上运行多次.因此,我需要尽可能接近numpy矢量化解决方案,因为我想避免python循环.
为了避免重新输入,这里是数组:
import numpy as np
a = np.array([4, 0, 8, 5, 10, 9, np.nan, 1, 4, 9, 9, np.nan, np.nan, 9,\
9, 8, 0, 3, 7, 9, 2, 6, 7, 2, 9, 4, 1, 1, np.nan, 10])
Run Code Online (Sandbox Code Playgroud)
又一个答案!
它只需要你已经拥有的掩码并应用逻辑或自身的移位版本.很好地矢量化和疯狂快速!:d
def repeat_or(a, n=4):
m = np.isnan(a)
k = m.copy()
# lenM and lenK say for each mask how many
# subsequent Trues there are at least
lenM, lenK = 1, 1
# we run until a combination of both masks will give us n or more
# subsequent Trues
while lenM+lenK < n:
# append what we have in k to the end of what we have in m
m[lenM:] |= k[:-lenM]
# swap so that m is again the small one
m, k = k, m
# update the lengths
lenM, lenK = lenK, lenM+lenK
# see how much m has to be shifted in order to append the missing Trues
k[n-lenM:] |= m[:-n+lenM]
return k
Run Code Online (Sandbox Code Playgroud)
不幸的是我无法m[i:] |= m[:-i]
运行...修改和使用掩码修改自己可能是一个坏主意.它确实有效m[:-i] |= m[i:]
,但这是错误的方向.
无论如何,我们现在有了斐波纳契式的增长,而不是二次增长,这仍然优于线性增长.
(我从未想过我实际上会编写一个与Fibonacci序列真正相关的算法而不会出现一些奇怪的数学问题.)
在"真实"条件下使用1e6和1e5 NANs数组进行测试:
In [5]: a = np.random.random(size=1e6)
In [6]: a[np.random.choice(np.arange(len(a), dtype=int), 1e5, replace=False)] = np.nan
In [7]: %timeit reduceat(a)
10 loops, best of 3: 65.2 ms per loop
In [8]: %timeit index_expansion(a)
100 loops, best of 3: 12 ms per loop
In [9]: %timeit cumsum_trick(a)
10 loops, best of 3: 17 ms per loop
In [10]: %timeit repeat_or(a)
1000 loops, best of 3: 1.9 ms per loop
In [11]: %timeit agml_indexing(a)
100 loops, best of 3: 6.91 ms per loop
Run Code Online (Sandbox Code Playgroud)
我将给托马斯留下更多的基准.
在这里使用基准测试结果。我已经包含了我自己的(“op”),我开始使用它,它遍历坏索引并将 1...n 添加到它们,然后使用唯一值来查找掩码索引。您可以在下面的代码中看到它以及所有其他响应。
无论如何,这是结果。面是沿 x (10 到 10e7) 的数组大小和沿 y (5, 50, 500, 5000) 的窗口大小。然后是每个方面的编码器,得分为 log-10,因为我们正在谈论微秒到分钟。
@swenzel 似乎是他的第二个答案的赢家,取代了 @moarningsun 的第一个答案(moarningsun 的第二个答案是通过大量内存使用使机器崩溃,但这可能是因为它不是为大 n 或非稀疏 a 设计的)。
由于(必要的)对数刻度,该图表并没有对这些贡献中最快的做出公正的评价。它们甚至比像样的循环解决方案快几十、几百倍。在最大的情况下,swenzel1 比 op 快 1000 倍,而且 op 已经在使用 numpy。
请注意,我使用了针对优化的 Intel MKL 库编译的 numpy 版本,该库充分利用了自 2012 年以来存在的 AVX 指令。在某些向量用例中,这会将 i7/Xeon 速度提高 5 倍。其中一些贡献可能比其他人受益更多。
这是迄今为止运行所有提交答案的完整代码,包括我自己的。函数 allagree() 确保结果正确,而 timeall() 将为您提供一个长格式的 Pandas Dataframe,其中所有结果都在几秒钟内。
你可以很容易地用新代码重新运行它,或者改变我的假设。请记住,我没有考虑其他因素,例如内存使用情况。另外,我对图形使用了 R ggplot2,因为我对 seaborn/matplotlib 的了解不够好,无法让它做我想做的事。
为完整起见,所有结果均一致:
In [4]: allagree(n = 7, asize = 777)
Out[4]:
AGML0 AGML1 askewchan0 askewchan1 askewchan2 moarningsun0 \
AGML0 True True True True True True
AGML1 True True True True True True
askewchan0 True True True True True True
askewchan1 True True True True True True
askewchan2 True True True True True True
moarningsun0 True True True True True True
swenzel0 True True True True True True
swenzel1 True True True True True True
op True True True True True True
swenzel0 swenzel1 op
AGML0 True True True
AGML1 True True True
askewchan0 True True True
askewchan1 True True True
askewchan2 True True True
moarningsun0 True True True
swenzel0 True True True
swenzel1 True True True
op True True True
Run Code Online (Sandbox Code Playgroud)
感谢所有提交的人!
在 R 中使用 pd.to_csv 和 read.csv 导出 timeall() 的输出后的图形代码:
ww <- read.csv("ww.csv")
ggplot(ww, aes(x=coder, y=value, col = coder)) + geom_point(size = 3) + scale_y_continuous(trans="log10")+ facet_grid(nsize ~ asize) + theme(axis.text.x = element_text(angle = 90, hjust = 1)) + ggtitle("Fastest by coder") + ylab("time (seconds)")
Run Code Online (Sandbox Code Playgroud)
测试代码:
# test Stack Overflow 32706135 nan shift routines
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
from timeit import Timer
from scipy import ndimage
from skimage import morphology
import itertools
import pdb
np.random.seed(8472)
def AGML0(a, n): # loop itertools
maskleft = np.where(np.isnan(a))[0]
maskright = maskleft + n
mask = np.zeros(len(a),dtype=bool)
for l,r in itertools.izip(maskleft,maskright):
mask[l:r] = True
return mask
def AGML1(a, n): # loop n
nn = n - 1
maskleft = np.where(np.isnan(a))[0]
ghost_mask = np.zeros(len(a)+nn,dtype=bool)
for i in range(0, nn+1):
thismask = maskleft + i
ghost_mask[thismask] = True
mask = ghost_mask[:len(ghost_mask)-nn]
return mask
def askewchan0(a, n):
m = np.isnan(a)
i = np.arange(1, len(m)+1)
ind = np.column_stack([i-n, i]) # may be a faster way to generate this
ind.clip(0, len(m)-1, out=ind)
return np.bitwise_or.reduceat(m, ind.ravel())[::2]
def askewchan1(a, n):
m = np.isnan(a)
s = np.full(n, True, bool)
return ndimage.binary_dilation(m, structure=s, origin=-(n//2))
def askewchan2(a, n):
m = np.isnan(a)
s = np.zeros(2*n - n%2, bool)
s[-n:] = True
return morphology.binary_dilation(m, selem=s)
def moarningsun0(a, n):
mask = np.isnan(a)
cs = np.cumsum(mask)
cs[n:] -= cs[:-n].copy()
return cs > 0
def moarningsun1(a, n):
mask = np.isnan(a)
idx = np.flatnonzero(mask)
expanded_idx = idx[:,None] + np.arange(1, n)
np.put(mask, expanded_idx, True, 'clip')
return mask
def swenzel0(a, n):
m = np.isnan(a)
k = m.copy()
for i in range(1, n):
k[i:] |= m[:-i]
return k
def swenzel1(a, n=4):
m = np.isnan(a)
k = m.copy()
# lenM and lenK say for each mask how many
# subsequent Trues there are at least
lenM, lenK = 1, 1
# we run until a combination of both masks will give us n or more
# subsequent Trues
while lenM+lenK < n:
# append what we have in k to the end of what we have in m
m[lenM:] |= k[:-lenM]
# swap so that m is again the small one
m, k = k, m
# update the lengths
lenM, lenK = lenK, lenM+lenK
# see how much m has to be shifted in order to append the missing Trues
k[n-lenM:] |= m[:-n+lenM]
return k
def op(a, n):
m = np.isnan(a)
for x in range(1, n):
m = np.logical_or(m, np.r_[False, m][:-1])
return m
# all the functions in a list. NB these are the actual functions, not their names
funcs = [AGML0, AGML1, askewchan0, askewchan1, askewchan2, moarningsun0, swenzel0, swenzel1, op]
def allagree(fns = funcs, n = 10, asize = 100):
""" make sure result is the same from all functions """
fnames = [f.__name__ for f in fns]
a = np.random.rand(asize)
a[np.random.randint(0, asize, int(asize / 10))] = np.nan
results = dict([(f.__name__, f(a, n)) for f in fns])
isgood = [[np.array_equal(results[f1], results[f2]) for f1 in fnames] for f2 in fnames]
pdgood = pd.DataFrame(isgood, columns = fnames, index = fnames)
if not all([all(x) for x in isgood]):
print "not all results identical"
pdb.set_trace()
return pdgood
def timeone(f):
""" time one of the functions across the full range of a nd n """
print "Timing", f.__name__
Ns = np.array([10**x for x in range(0, 4)]) * 5 # 5 to 5000 window size
As = [np.random.rand(10 ** x) for x in range(1, 8)] # up to 10 million data data points
for i in range(len(As)): # 10% of points are always bad
As[i][np.random.randint(0, len(As[i]), len(As[i]) / 10)] = np.nan
results = np.array([[Timer(lambda: f(a, n)).timeit(number = 1) if n < len(a) \
else np.nan for n in Ns] for a in As])
pdresults = pd.DataFrame(results, index = [len(x) for x in As], columns = Ns)
return pdresults
def timeall(fns = funcs):
""" run timeone for all known funcs """
testd = dict([(x.__name__, timeone(x)) for x in fns])
testdf = pd.concat(testd.values(), axis = 0, keys = testd.keys())
testdf.index.names = ["coder", "asize"]
testdf.columns.names = ["nsize"]
testdf.reset_index(inplace = True)
testdf = pd.melt(testdf, id_vars = ["coder", "asize"])
return testdf
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
777 次 |
最近记录: |