有效地为每个坏值将n个单元格向右延伸n个单元格

Tho*_*wne 8 python numpy bigdata

假设我有一个长度为30的数组,其中包含4个错误值.我想为那些不好的值创建一个掩码,但由于我将使用滚动窗口函数,我还希望在每个坏值被标记为坏之后有一定数量的后续索引.在下面,n = 3:

在此输入图像描述

我想尽可能高效地执行此操作,因为此例程将在包含数十亿个数据点的大型数据系列上运行多次.因此,我需要尽可能接近numpy矢量化解决方案,因为我想避免python循环.

为了避免重新输入,这里是数组:

import numpy as np
a = np.array([4, 0, 8, 5, 10, 9, np.nan, 1, 4, 9, 9, np.nan, np.nan, 9,\
              9, 8, 0, 3, 7, 9, 2, 6, 7, 2, 9, 4, 1, 1, np.nan, 10])
Run Code Online (Sandbox Code Playgroud)

swe*_*zel 6

又一个答案!
它只需要你已经拥有的掩码并应用逻辑或自身的移位版本.很好地矢量化和疯狂快速!:d

def repeat_or(a, n=4):
    m = np.isnan(a)
    k = m.copy()

    # lenM and lenK say for each mask how many
    # subsequent Trues there are at least
    lenM, lenK = 1, 1

    # we run until a combination of both masks will give us n or more
    # subsequent Trues
    while lenM+lenK < n:
        # append what we have in k to the end of what we have in m
        m[lenM:] |= k[:-lenM]

        # swap so that m is again the small one
        m, k = k, m

        # update the lengths
        lenM, lenK = lenK, lenM+lenK

    # see how much m has to be shifted in order to append the missing Trues
    k[n-lenM:] |= m[:-n+lenM]

    return k
Run Code Online (Sandbox Code Playgroud)

不幸的是我无法m[i:] |= m[:-i]运行...修改和使用掩码修改自己可能是一个坏主意.它确实有效m[:-i] |= m[i:],但这是错误的方向.
无论如何,我们现在有了斐波纳契式的增长,而不是二次增长,这仍然优于线性增长.
(我从未想过我实际上会编写一个与Fibonacci序列真正相关的算法而不会出现一些奇怪的数学问题.)

在"真实"条件下使用1e6和1e5 NANs数组进行测试:

In [5]: a = np.random.random(size=1e6)

In [6]: a[np.random.choice(np.arange(len(a), dtype=int), 1e5, replace=False)] = np.nan

In [7]: %timeit reduceat(a)
10 loops, best of 3: 65.2 ms per loop

In [8]: %timeit index_expansion(a)
100 loops, best of 3: 12 ms per loop

In [9]: %timeit cumsum_trick(a)
10 loops, best of 3: 17 ms per loop

In [10]: %timeit repeat_or(a)
1000 loops, best of 3: 1.9 ms per loop

In [11]: %timeit agml_indexing(a)
100 loops, best of 3: 6.91 ms per loop
Run Code Online (Sandbox Code Playgroud)

我将给托马斯留下更多的基准.


Tho*_*wne 5

在这里使用基准测试结果。我已经包含了我自己的(“op”),我开始使用它,它遍历坏索引并将 1...n 添加到它们,然后使用唯一值来查找掩码索引。您可以在下面的代码中看到它以及所有其他响应。

无论如何,这是结果。面是沿 x (10 到 10e7) 的数组大小和沿 y (5, 50, 500, 5000) 的窗口大小。然后是每个方面的编码器,得分为 log-10,因为我们正在谈论微秒到分钟。

在此处输入图片说明

@swenzel 似乎是他的第二个答案的赢家,取代了 @moarningsun 的第一个答案(moarningsun 的第二个答案是通过大量内存使用使机器崩溃,但这可能是因为它不是为大 n 或非稀疏 a 设计的)。

由于(必要的)对数刻度,该图表并没有对这些贡献中最快的做出公正的评价。它们甚至比像样的循环解决方案快几十、几百倍。在最大的情况下,swenzel1 比 op 快 1000 倍,而且 op 已经在使用 numpy。

请注意,我使用了针对优化的 Intel MKL 库编译的 numpy 版本,该库充分利用了自 2012 年以来存在的 AVX 指令。在某些向量用例中,这会将 i7/Xeon 速度提高 5 倍。其中一些贡献可能比其他人受益更多。

这是迄今为止运行所有提交答案的完整代码,包括我自己的。函数 allagree() 确保结果正确,而 timeall() 将为您提供一个长格式的 Pandas Dataframe,其中所有结果都在几秒钟内。

你可以很容易地用新代码重新运行它,或者改变我的假设。请记住,我没有考虑其他因素,例如内存使用情况。另外,我对图形使用了 R ggplot2,因为我对 seaborn/matplotlib 的了解不够好,无法让它做我想做的事。

为完整起见,所有结果均一致:

In [4]: allagree(n = 7, asize = 777)
Out[4]:
             AGML0 AGML1 askewchan0 askewchan1 askewchan2 moarningsun0  \
AGML0         True  True       True       True       True         True
AGML1         True  True       True       True       True         True
askewchan0    True  True       True       True       True         True
askewchan1    True  True       True       True       True         True
askewchan2    True  True       True       True       True         True
moarningsun0  True  True       True       True       True         True
swenzel0      True  True       True       True       True         True
swenzel1      True  True       True       True       True         True
op            True  True       True       True       True         True

             swenzel0 swenzel1    op
AGML0            True     True  True
AGML1            True     True  True
askewchan0       True     True  True
askewchan1       True     True  True
askewchan2       True     True  True
moarningsun0     True     True  True
swenzel0         True     True  True
swenzel1         True     True  True
op               True     True  True
Run Code Online (Sandbox Code Playgroud)

感谢所有提交的人!

在 R 中使用 pd.to_csv 和 read.csv 导出 timeall() 的输出后的图形代码:

ww <- read.csv("ww.csv")    
ggplot(ww, aes(x=coder, y=value, col = coder)) + geom_point(size = 3) + scale_y_continuous(trans="log10")+ facet_grid(nsize ~ asize) + theme(axis.text.x = element_text(angle = 90, hjust = 1)) + ggtitle("Fastest by coder") + ylab("time (seconds)")
Run Code Online (Sandbox Code Playgroud)

测试代码:

# test Stack Overflow 32706135 nan shift routines

import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
from timeit import Timer
from scipy import ndimage
from skimage import morphology
import itertools
import pdb
np.random.seed(8472)


def AGML0(a, n):                               # loop itertools
    maskleft = np.where(np.isnan(a))[0]
    maskright = maskleft + n
    mask = np.zeros(len(a),dtype=bool)
    for l,r in itertools.izip(maskleft,maskright): 
        mask[l:r] = True
    return mask


def AGML1(a, n):                               # loop n
    nn = n - 1
    maskleft = np.where(np.isnan(a))[0]
    ghost_mask = np.zeros(len(a)+nn,dtype=bool)
    for i in range(0, nn+1):
        thismask = maskleft + i
        ghost_mask[thismask] = True
    mask = ghost_mask[:len(ghost_mask)-nn]
    return mask


def askewchan0(a, n):
    m = np.isnan(a)
    i = np.arange(1, len(m)+1)
    ind = np.column_stack([i-n, i]) # may be a faster way to generate this
    ind.clip(0, len(m)-1, out=ind)
    return np.bitwise_or.reduceat(m, ind.ravel())[::2]


def askewchan1(a, n):
    m = np.isnan(a)
    s = np.full(n, True, bool)
    return ndimage.binary_dilation(m, structure=s, origin=-(n//2))


def askewchan2(a, n):
    m = np.isnan(a)
    s = np.zeros(2*n - n%2, bool)
    s[-n:] = True
    return morphology.binary_dilation(m, selem=s)


def moarningsun0(a, n):
    mask = np.isnan(a)
    cs = np.cumsum(mask)
    cs[n:] -= cs[:-n].copy()
    return cs > 0


def moarningsun1(a, n):
    mask = np.isnan(a)
    idx = np.flatnonzero(mask)
    expanded_idx = idx[:,None] + np.arange(1, n)
    np.put(mask, expanded_idx, True, 'clip')
    return mask


def swenzel0(a, n):
    m = np.isnan(a)
    k = m.copy()
    for i in range(1, n):
        k[i:] |= m[:-i]
    return k


def swenzel1(a, n=4):
    m = np.isnan(a)
    k = m.copy()

    # lenM and lenK say for each mask how many
    # subsequent Trues there are at least
    lenM, lenK = 1, 1

    # we run until a combination of both masks will give us n or more
    # subsequent Trues
    while lenM+lenK < n:
        # append what we have in k to the end of what we have in m
        m[lenM:] |= k[:-lenM]

        # swap so that m is again the small one
        m, k = k, m

        # update the lengths
        lenM, lenK = lenK, lenM+lenK

    # see how much m has to be shifted in order to append the missing Trues
    k[n-lenM:] |= m[:-n+lenM]
    return k


def op(a, n):
    m = np.isnan(a)
    for x in range(1, n):
        m = np.logical_or(m, np.r_[False, m][:-1])
    return m


# all the functions in a list. NB these are the actual functions, not their names
funcs = [AGML0, AGML1, askewchan0, askewchan1, askewchan2, moarningsun0, swenzel0, swenzel1, op]

def allagree(fns = funcs, n = 10, asize = 100):
    """ make sure result is the same from all functions """
    fnames = [f.__name__ for f in fns]
    a = np.random.rand(asize)
    a[np.random.randint(0, asize, int(asize / 10))] = np.nan
    results = dict([(f.__name__, f(a, n)) for f in fns])
    isgood = [[np.array_equal(results[f1], results[f2]) for f1 in fnames] for f2 in fnames]
    pdgood = pd.DataFrame(isgood, columns = fnames, index = fnames)
    if not all([all(x) for x in isgood]):
        print "not all results identical"
        pdb.set_trace()
    return pdgood


def timeone(f):
    """ time one of the functions across the full range of a nd n """
    print "Timing", f.__name__
    Ns = np.array([10**x for x in range(0, 4)]) * 5 # 5 to 5000 window size
    As = [np.random.rand(10 ** x) for x in range(1, 8)] # up to 10 million data data points
    for i in range(len(As)): # 10% of points are always bad
        As[i][np.random.randint(0, len(As[i]), len(As[i]) / 10)] = np.nan
    results = np.array([[Timer(lambda: f(a, n)).timeit(number = 1) if n < len(a) \
                        else np.nan for n in Ns] for a in As])
    pdresults = pd.DataFrame(results, index = [len(x) for x in As], columns = Ns)
    return pdresults


def timeall(fns = funcs):
    """ run timeone for all known funcs """
    testd = dict([(x.__name__, timeone(x)) for x in fns])
    testdf = pd.concat(testd.values(), axis = 0, keys = testd.keys())
    testdf.index.names = ["coder", "asize"]
    testdf.columns.names = ["nsize"]
    testdf.reset_index(inplace = True)
    testdf = pd.melt(testdf, id_vars = ["coder", "asize"])
    return testdf
Run Code Online (Sandbox Code Playgroud)

  • 从问题中的图片中,我期望有一个相当不错的基准概述......你提供了:) (2认同)