从第三个数组中的两个数组中有效地获得每对元素的最小值

Gab*_*iel 5 python arrays performance numpy

我有两个N浮点数组(作为(x,y)坐标,可能有重复)和相关zN浮点数组(作为坐标的权重).

对于每(x,y)对浮点数,我需要选择具有最小关联z值的对.我已经定义了一个selectMinz()执行此操作的函数(请参阅下面的代码),但这需要太长时间.

我怎样才能提高这个功能的性能?

import numpy as np
import time


def getData():
    N = 100000
    x = np.arange(0.0005, 0.03, 0.001)
    y = np.arange(6., 10., .05)
    # Select N values for x,y, where values can be repeated
    x = np.random.choice(x, N)
    y = np.random.choice(y, N)
    z = np.random.uniform(10., 15., N)
    return x, y, z


def selectMinz(x, y, z):
    """
    Select the minimum z for each (x,y) pair.
    """
    xy_unq, z_unq = [], []
    # For each (x,y) pair
    for i, xy in enumerate(zip(*[x, y])):
        # If this xy pair was already stored in the xy_unq list
        if xy in xy_unq:
            # If the stored z value associated with this xy pair is
            # larger than this new z[i] value
            if z_unq[xy_unq.index(xy)] > z[i]:
                # Store this smaller value instead
                z_unq[xy_unq.index(xy)] = z[i]
        else:
            # Store the xy pair, and its associated z value
            xy_unq.append(xy)
            z_unq.append(z[i])

    return xy_unq, z_unq


# Define data with the proper format.
x, y, z = getData()

s = time.clock()
xy_unq, z_unq = selectMinz(x, y, z)  # <-- TAKES TOO LONG (~15s in my system)
print(time.clock() - s)
Run Code Online (Sandbox Code Playgroud)

Div*_*kar 3

脚步 :

\n\n
    \n
  1. 用于lex-sort使x-y配对按顺序出现。或者,我们可以使用缩放方法将其中一个数组按另一个数组的值范围进行缩放,然后与另一个数组相加,最后使用argsort获取按 lex 排序的等效索引。
  2. \n
  3. 使用np.minimum.reduceat获取由对分组定义的间隔中的最小值。
  4. \n
\n\n

因此,我们将拥有一个矢量化解决方案,如下所示 -

\n\n
def selectMinz_vectorized(x, y, z):\n    # Get grouped lex-sort indices\n    sidx = (y + x*(y.max() - y.min() + 1)).argsort()\n    # or sidx = np.lexsort([x, y])\n\n    # Lex-sort x, y, z\n    x_sorted = x[sidx]\n    y_sorted = y[sidx]\n    z_sorted = z[sidx]\n\n    # Get equality mask between each sorted X and Y elem against previous ones.\n    # The non-zero indices of its inverted mask gives us the indices where the \n    # new groupings start. We are calling those as cut_idx.\n    seq_eq_mask = (x_sorted[1:] == x_sorted[:-1]) & (y_sorted[1:] == y_sorted[:-1])\n    cut_idx = np.flatnonzero(np.concatenate(( [True], ~seq_eq_mask)))\n\n    # Use those cut_idx to get intervalled minimum values\n    minZ = np.minimum.reduceat(z_sorted, cut_idx)\n\n    # Make tuples of the groupings of x,y and the corresponding min Z values\n    return (zip(x_sorted[cut_idx], y_sorted[cut_idx]), minZ.tolist())\n
Run Code Online (Sandbox Code Playgroud)\n\n

样本运行 -

\n\n
In [120]: np.c_[x,y,z]\nOut[120]: \narray([[  0.,   1.,  69.],\n       [  2.,   0.,  47.],\n       [  1.,   0.,  62.],\n       [  0.,   2.,  33.],\n       [  1.,   7.,  32.],\n       [  1.,   0.,  50.],\n       [  2.,   0.,  55.]])\n\nIn [121]: selectMinz(x,y,z) # original method\nOut[121]: \n([(0.0, 1.0), (2.0, 0.0), (1.0, 0.0), (0.0, 2.0), (1.0, 7.0)],\n [69.0, 47.0, 50.0, 33.0, 32.0])\n\nIn [122]: selectMinz_vectorized(x,y,z)\nOut[122]: \n([(1.0, 0.0), (2.0, 0.0), (0.0, 1.0), (0.0, 2.0), (1.0, 7.0)],\n [50.0, 47.0, 69.0, 33.0, 32.0])\n
Run Code Online (Sandbox Code Playgroud)\n\n
\n\n

这是我最初的方法,涉及创建一个堆叠数组,然后执行这些操作。实现看起来像这样 -

\n\n
def selectMinz_vectorized_v2(x, y, z):\n    d = np.column_stack((x,y,z))\n    sidx = np.lexsort(d[:,:2].T)\n    b = d[sidx]  \n    cut_idx = np.r_[0,np.flatnonzero(~(b[1:,:2] == b[:-1,:2]).all(1))+1]\n    minZ = np.minimum.reduceat(b[:,-1], cut_idx)\n    return ([tuple(i) for i in b[cut_idx,:2]], minZ.tolist())\n
Run Code Online (Sandbox Code Playgroud)\n\n
\n\n

矢量化方法的基准测试

\n\n

方法 -

\n\n
# Pruned version of the approach posted earlier\ndef selectMinz_vectorized_pruned(x, y, z):\n    sidx = (y + x*(y.max() - y.min() + 1)).argsort()\n    x_sorted = x[sidx]\n    y_sorted = y[sidx]\n    z_sorted = z[sidx]\n    seq_eq_mask = (x_sorted[1:] == x_sorted[:-1]) & (y_sorted[1:] == y_sorted[:-1])\n    cut_idx = np.flatnonzero(np.concatenate(( [True], ~seq_eq_mask)))\n    minZ = np.minimum.reduceat(z_sorted, cut_idx)\n    return x_sorted[cut_idx], y_sorted[cut_idx], minZ\n\ndef numpy_indexed_app(x,y,z): # @Eelco Hoogendoorn's soln\n    return npi.group_by((x, y)).min(z)\n
Run Code Online (Sandbox Code Playgroud)\n\n

时间安排 -

\n\n
In [141]: x,y,z=getData(10000)\n\nIn [142]: %timeit selectMinz_vectorized_pruned(x, y, z)\n     ...: %timeit numpy_indexed_app(x,y,z)\n     ...: \n1000 loops, best of 3: 763 \xc2\xb5s per loop\n1000 loops, best of 3: 1.09 ms per loop\n\nIn [143]: x,y,z=getData(100000)\n\nIn [144]: %timeit selectMinz_vectorized_pruned(x, y, z)\n     ...: %timeit numpy_indexed_app(x,y,z)\n     ...: \n100 loops, best of 3: 8.53 ms per loop\n100 loops, best of 3: 12.9 ms per loop\n
Run Code Online (Sandbox Code Playgroud)\n

  • @NimishBansal 只是通过在这里回答 MATLAB/NumPy 问题并以最虔诚的方式避免循环;) (2认同)