使用多个自定义索引范围构建numpy数组,而无需显式循环

sno*_*key 4 python arrays performance numpy vectorization

在Numpy中,是否有一种pythonic方法来创建array3,其中自定义范围来自array1和array2而没有循环?迭代范围的直接解决方案可行,但由于我的数组遇到了数百万个项目,我正在寻找更有效的解决方案(也可能是语法糖).

例如,

array1 = np.array([10, 65, 200]) 
array2 = np.array([14, 70, 204])
array3 = np.concatenate([np.arange(array1[i], array2[i]) for i in
                         np.arange(0,len(array1))])

print array3
Run Code Online (Sandbox Code Playgroud)

结果:[10,11,12,13,65,66,67,68,69,200,201,202,203].

unu*_*tbu 5

假设范围不重叠,您可以构建一个非零的掩码,其中索引介于array1和之间指定的范围之间array2,然后用于np.flatnonzero获取索引数组 - 所需的array3:

import numpy as np

array1 = np.array([10, 65, 200]) 
array2 = np.array([14, 70, 204])

first, last = array1.min(), array2.max()
array3 = np.zeros(last-first+1, dtype='i1')
array3[array1-first] = 1
array3[array2-first] = -1
array3 = np.flatnonzero(array3.cumsum())+first
print(array3)
Run Code Online (Sandbox Code Playgroud)

产量

[ 10  11  12  13  65  66  67  68  69 200 201 202 203]
Run Code Online (Sandbox Code Playgroud)

对于大型len(array1),using_flatnonzero可以明显快于using_loop:

def using_flatnonzero(array1, array2):
    first, last = array1.min(), array2.max()
    array3 = np.zeros(last-first+1, dtype='i1')
    array3[array1-first] = 1
    array3[array2-first] = -1
    return np.flatnonzero(array3.cumsum())+first

def using_loop(array1, array2):
    return np.concatenate([np.arange(array1[i], array2[i]) for i in
                           np.arange(0,len(array1))])


array1, array2 = (np.random.choice(range(1, 11), size=10**4, replace=True)
                  .cumsum().reshape(2, -1, order='F'))


assert np.allclose(using_flatnonzero(array1, array2), using_loop(array1, array2))
Run Code Online (Sandbox Code Playgroud)
In [260]: %timeit using_loop(array1, array2)
100 loops, best of 3: 9.36 ms per loop

In [261]: %timeit using_flatnonzero(array1, array2)
1000 loops, best of 3: 564 µs per loop
Run Code Online (Sandbox Code Playgroud)

如果范围重叠,则using_loop返回array3包含重复的范围.using_flatnonzero返回一个没有重复的数组.


说明:让我们看一个小例子

array1 = np.array([10, 65, 200]) 
array2 = np.array([14, 70, 204])
Run Code Online (Sandbox Code Playgroud)

目标是构建一个goal如下所示的数组.1是位于索引值[ 10, 11, 12, 13, 65, 66, 67, 68, 69, 200, 201, 202, 203](即array3):

In [306]: goal
Out[306]: 
array([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0,
       0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
       0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1,
       1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
       0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
       0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
       0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
       0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
       0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1], dtype=int8)
Run Code Online (Sandbox Code Playgroud)

一旦我们有了goal数组,array3就可以通过调用获得np.flatnonzero:

In [307]: np.flatnonzero(goal)
Out[307]: array([ 10,  11,  12,  13,  65,  66,  67,  68,  69, 200, 201, 202, 203])
Run Code Online (Sandbox Code Playgroud)

goal长度与array2.max():

In [308]: array2.max()
Out[308]: 204

In [309]: goal.shape
Out[309]: (204,)
Run Code Online (Sandbox Code Playgroud)

所以我们可以从分配开始

goal = np.zeros(array2.max()+1, dtype='i1')
Run Code Online (Sandbox Code Playgroud)

然后在由以下给出的指数array1和-1处给出的索引位置填充1 array2:

In [311]: goal[array1] = 1
In [312]: goal[array2] = -1
In [313]: goal
Out[313]: 
array([ 0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  1,  0,  0,  0, -1,  0,  0,
        0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,
        0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,
        0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  1,  0,  0,
        0,  0, -1,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,
        0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,
        0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,
        0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,
        0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,
        0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,
        0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,
        0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  1,  0,  0,  0,
       -1], dtype=int8)
Run Code Online (Sandbox Code Playgroud)

现在应用cumsum(累积和)产生所需的goal数组:

In [314]: goal = goal.cumsum(); goal
Out[314]: 
array([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0,
       0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
       0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1,
       1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
       0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
       0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
       0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
       0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
       0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 0])

In [315]: np.flatnonzero(goal)
Out[315]: array([ 10,  11,  12,  13,  65,  66,  67,  68,  69, 200, 201, 202, 203])
Run Code Online (Sandbox Code Playgroud)

这是背后的主要思想using_flatnonzero.减法first只是为了节省一点内存.


Div*_*kar 3

前瞻性方法

\n

我将回顾如何解决这个问题。

\n

采取问题中列出的样本。我们有 -

\n
array1 = np.array([10, 65, 200])\narray2 = np.array([14, 70, 204])\n
Run Code Online (Sandbox Code Playgroud)\n

现在,看看想要的结果 -

\n
result: [10,11,12,13,65,66,67,68,69,200,201,202,203]\n
Run Code Online (Sandbox Code Playgroud)\n

让我们计算组长度,因为我们接下来需要它们来解释解决方案。

\n
In [58]: lens = array2 - array1\n\nIn [59]: lens\nOut[59]: array([4, 5, 4])\n
Run Code Online (Sandbox Code Playgroud)\n

这个想法是使用1\ 的初始化数组,当对整个长度进行累积求和时,将为我们提供所需的结果。\n此累积求和将是我们解决方案的最后一步。\n为什么要1初始化?好吧,因为我们有一个以 \'s 为步长的数组,1除了在特定位置\n我们有相应的新组的变化。

\n

现在,因为这cumsum是最后一步,所以它之前的步骤应该给我们类似的东西 -

\n
array([ 10,   1,   1,   1,  52,   1,   1,   1,   1, 131,   1,   1,   1])\n
Run Code Online (Sandbox Code Playgroud)\n

正如之前所讨论的,它在特定的地方1充满[10,52,131]。这10似乎是从 中的第一个元素进来的array1,但是其余的呢?\n第二个元素作为(查看)52进来,并且由于第一个元素的长度而进入以 开头并运行的组中\n团体。因此,如果我们这样做,我们将得到,然后添加以适应边界停止,我们将得到,这是所需的移位值。同样,我们会得到.65-13result1310465 - 10 - 451152131

\n

因此,shifting-values可以计算这些,就像这样 -

\n
In [62]: np.diff(array1) - lens[:-1]+1\nOut[62]: array([ 52, 131])\n
Run Code Online (Sandbox Code Playgroud)\n

接下来,为了获得shifting-places发生此类转变的那些,我们可以简单地对组长度进行累积求和 -

\n
In [65]: lens[:-1].cumsum()\nOut[65]: array([4, 9])\n
Run Code Online (Sandbox Code Playgroud)\n

为了完整起见,我们需要预先附加0数组 ofshifting-placesarray1[0]for shifting-values

\n

因此,我们准备以分步的形式展示我们的方法!

\n
\n

把碎片放回去

\n

1]获取每组的长度:

\n
lens = array2 - array1\n
Run Code Online (Sandbox Code Playgroud)\n

2] 获取发生移位的索引以及要放入1\ 的初始化数组中的值:

\n
shift_idx = np.hstack((0,lens[:-1].cumsum()))\nshift_vals = np.hstack((array1[0],np.diff(array1) - lens[:-1]+1))\n
Run Code Online (Sandbox Code Playgroud)\n

3] 设置1初始化的 ID 数组,用于在之前步骤中列出的索引处插入这些值:

\n
id_arr = np.ones(lens.sum(),dtype=array1.dtype)\nid_arr[shift_idx] = shift_vals\n
Run Code Online (Sandbox Code Playgroud)\n

4]最后对ID数组进行累加求和:

\n
output = id_arr.cumsum() \n
Run Code Online (Sandbox Code Playgroud)\n

以函数格式列出,我们将有 -

\n
def using_ones_cumsum(array1, array2):\n    lens = array2 - array1\n    shift_idx = np.hstack((0,lens[:-1].cumsum()))\n    shift_vals = np.hstack((array1[0],np.diff(array1) - lens[:-1]+1))\n    id_arr = np.ones(lens.sum(),dtype=array1.dtype)\n    id_arr[shift_idx] = shift_vals\n    return id_arr.cumsum()\n  \n
Run Code Online (Sandbox Code Playgroud)\n

它也适用于重叠范围!

\n
In [67]: array1 = np.array([10, 11, 200]) \n    ...: array2 = np.array([14, 18, 204])\n    ...: \n\nIn [68]: using_ones_cumsum(array1, array2)\nOut[68]: \narray([ 10,  11,  12,  13,  11,  12,  13,  14,  15,  16,  17, 200, 201,\n       202, 203])\n
Run Code Online (Sandbox Code Playgroud)\n
\n

运行时测试

\n

让我们将所提出的方法与 中的其他矢量化方法进行比较@unutbu\'s flatnonzero based solution,该方法已被证明比循环方法要好得多 -

\n
In [38]: array1, array2 = (np.random.choice(range(1, 11), size=10**4, replace=True)\n    ...:                   .cumsum().reshape(2, -1, order=\'F\'))\n\nIn [39]: %timeit using_flatnonzero(array1, array2)\n1000 loops, best of 3: 889 \xc2\xb5s per loop\n\nIn [40]: %timeit using_ones_cumsum(array1, array2)\n1000 loops, best of 3: 235 \xc2\xb5s per loop\n
Run Code Online (Sandbox Code Playgroud)\n
\n

改进!

\n

现在,按代码方式 NumPy 不喜欢附加。因此,np.hstack对于稍微改进的版本,可以避免这些调用,如下所示 -

\n
def get_ranges_arr(starts,ends):\n    counts = ends - starts\n    counts_csum = counts.cumsum()\n    id_arr = np.ones(counts_csum[-1],dtype=int)\n    id_arr[0] = starts[0]\n    id_arr[counts_csum[:-1]] = starts[1:] - ends[:-1] + 1\n    return id_arr.cumsum()\n
Run Code Online (Sandbox Code Playgroud)\n

让我们根据我们最初的方法来计算时间 -

\n
In [151]: array1,array2 = (np.random.choice(range(1, 11),size=10**4, replace=True)\\\n     ...:                                      .cumsum().reshape(2, -1, order=\'F\'))\n\nIn [152]: %timeit using_ones_cumsum(array1, array2)\n1000 loops, best of 3: 276 \xc2\xb5s per loop\n\nIn [153]: %timeit get_ranges_arr(array1, array2)\n10000 loops, best of 3: 193 \xc2\xb5s per loop\n
Run Code Online (Sandbox Code Playgroud)\n

所以,我们的30%性能得到了提升!

\n