如何加速 numpy.unique 并提供计数和重复行索引

C. *_*ney 6 python arrays numpy numba

我试图在 numpy 数组中查找重复的行。以下代码复制了我的数组的结构,该数组有 n 行、m 列、每行 nz 个非零条目:

import numpy as np
import random
import datetime


def create_mat(n, m, nz):
    sample_mat = np.zeros((n, m), dtype='uint8')
    random.seed(42)
    for row in range(0, n):
        counter = 0
        while counter < nz:
            random_col = random.randrange(0, m-1, 1)
            if sample_mat[row, random_col] == 0:
                sample_mat[row, random_col] = 1
                counter += 1
    test = np.all(np.sum(sample_mat, axis=1) == nz)
    print(f'All rows have {nz} elements: {test}')
    return sample_mat
Run Code Online (Sandbox Code Playgroud)

我尝试优化的代码如下:

if __name__ == '__main__':
    threshold = 2
    mat = create_mat(1800000, 108, 8)

    print(f'Time: {datetime.datetime.now()}')
    unique_rows, _, duplicate_counts = np.unique(mat, axis=0, return_counts=True, return_index=True)
    duplicate_indices = [int(x) for x in np.argwhere(duplicate_counts >= threshold)]
    print(f'Time: {datetime.datetime.now()}')

    print(f'Unique rows: {len(unique_rows)} Sample inds: {duplicate_indices[0:5]} Sample counts: {duplicate_counts[0:5]}')
    print(f'Sample rows:')
    print(unique_rows[0:5])

Run Code Online (Sandbox Code Playgroud)

我的输出如下:

All rows have 8 elements: True
Time: 2022-06-29 12:08:07.320834
Time: 2022-06-29 12:08:23.281633
Unique rows: 1799994 Sample inds: [508991, 553136, 930379, 1128637, 1290356] Sample counts: [1 1 1 1 1]
Sample rows:
[[0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
  0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
  0 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0 1 1 0 0 1 1 0 0 0 1 0 0 0 0 0 0 1 0 1 0]
 [0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
  0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
  0 0 0 0 0 0 0 0 0 0 0 1 0 0 0 1 1 0 0 0 0 0 1 1 1 1 0 0 0 0 1 0 0 0 0 0]
 [0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
  0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
  0 0 0 0 0 0 0 0 0 0 0 1 0 0 1 0 0 0 0 1 0 1 0 0 1 0 0 0 1 0 1 0 1 0 0 0]
 [0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
  0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
  0 0 0 0 0 0 0 0 0 0 0 1 0 1 1 0 0 0 0 1 1 0 0 0 0 0 0 1 1 0 1 0 0 0 0 0]
 [0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
  0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
  0 0 0 0 0 0 0 0 0 0 1 1 0 0 0 0 1 0 0 0 1 0 1 1 0 0 0 0 0 1 0 0 0 0 1 0]]
Run Code Online (Sandbox Code Playgroud)

我考虑过使用 numba,但挑战是它不使用轴参数进行操作。类似地,转换为列表和使用集合是一种选择,但随后循环执行重复计数似乎“unpythonic”。

鉴于我需要多次运行此代码(因为我正在修改 numpy 数组,然后需要重新搜索重复项),所以时间至关重要。我也尝试过针对此步骤使用多处理,但 np.unique 似乎会阻塞(即,即使当我尝试运行此步骤的多个版本时,我最终也会被限制为一个线程以 6% CPU 容量运行,而其他线程则处于阻塞状态)闲置的)。

Jér*_*ard 7

步骤一:位打包

由于您的矩阵仅包含二进制值,因此您可以积极地将这些位打包为 uint64 值,以便执行更有效的排序。这是一个 Numba 实现:

import numpy as np
import numba as nb

@nb.njit('(uint8[:,::1],)', parallel=True)
def pack_bits(mat):
    n, m = mat.shape
    res = np.zeros((n, (m+63)//64), np.uint64)
    for i in nb.prange(n):
        for bj in range(0, m, 64):
            val = np.uint64(0)
            if bj + 64 <= m:
                # Fast case
                for j in range(64):
                    val += np.uint64(mat[i, bj+j]) << (63 - j)
            else:
                # Slow case (boundary)
                for j in range(m - bj):
                    val += np.uint64(mat[i, bj+j]) << (63 - j)
            res[i, bj//64] = val
    return res

@nb.njit('(uint64[:,::1], int_)', parallel=True)
def unpack_bits(mat, m):
    n = mat.shape[0]
    assert mat.shape[1] == (m+63)//64
    res = np.zeros((n, m), np.uint64)
    for i in nb.prange(n):
        for bj in range(0, m, 64):
            val = np.uint64(mat[i, bj//64])
            if bj + 64 <= m:
                # Fast case
                for j in range(64):
                    res[i, bj+j] = np.uint8((val >> (63 - j)) & 1)
            else:
                # Slow case (boundary)
                for j in range(m - bj):
                    res[i, bj+j] = np.uint8((val >> (63 - j)) & 1)
    return res
Run Code Online (Sandbox Code Playgroud)

np.unique函数可以在更小的打包数组上调用,就像在初始代码中一样(除了生成的排序数组是打包数组并且需要解包)。由于您不需要索引,因此最好不要计算它。这样,return_index=True就可以去除了。此外,只能解包所需的值(解包比打包更昂贵,因为写入大矩阵比读取现有矩阵更昂贵)。

if __name__ == '__main__':
    threshold = 2
    n, m = 1800000, 108
    mat = create_mat(n, m, 8)

    print(f'Time: {datetime.datetime.now()}')
    packed_mat = pack_bits(mat)
    duplicate_packed_rows, duplicate_counts = np.unique(packed_mat, axis=0, return_counts=True)
    duplicate_indices = [int(x) for x in np.argwhere(duplicate_counts >= threshold)]
    print(f'Time: {datetime.datetime.now()}')

    print(f'Duplicate rows: {len(duplicate_rows)} Sample inds: {duplicate_indices[0:5]} Sample counts: {duplicate_counts[0:5]}')
    print(f'Sample rows:')
    print(unpack_bits(duplicate_packed_rows[0:5], m))
Run Code Online (Sandbox Code Playgroud)

第 2 步:np.unique优化

np.unique调用不是最优的,因为它执行多个昂贵的内部排序步骤。您的具体情况并不需要所有这些,并且可以优化某些步骤。

更有效的实现包括在第一步中对最后一列进行排序,然后对前一列进行排序,依此类推,直到第一列的排序与基数排序类似。请注意,最后一列可以使用不稳定的算法(通常更快)进行排序,但其他列需要稳定的算法。此方法仍然不是最佳的,因为argsort调用速度很慢并且当前的实现尚未使用多线程。不幸的是,Numpy 还没有证明任何有效的方法来对 2D 数组的行进行排序。虽然可以在 Numba 中重新实现这一点,但这很麻烦,做起来有点棘手,而且容易出错。更不用说与本机 C/C++ 代码相比,Numba 会带来一些开销。排序后,可以跟踪和计算唯一/重复的行。这是一个实现:

def sort_lines(mat):
    n, m = mat.shape

    for i in range(m):
        kind = 'stable' if i > 0 else None
        mat = mat[np.argsort(mat[:,m-1-i], kind=kind)]

    return mat

@nb.njit('(uint64[:,::1],)', parallel=True)
def find_duplicates(sorted_mat):
    n, m = sorted_mat.shape
    assert m >= 0

    isUnique = np.zeros(n, np.bool_)
    uniqueCount = 1
    if n > 0:
        isUnique[0] = True
    for i in nb.prange(1, n):
        isUniqueVal = False
        for j in range(m):
            isUniqueVal |= sorted_mat[i, j] != sorted_mat[i-1, j]
        isUnique[i] = isUniqueVal
        uniqueCount += isUniqueVal

    uniqueValues = np.empty((uniqueCount, m), np.uint64)
    duplicateCounts = np.zeros(len(uniqueValues), np.uint64)

    cursor = 0
    for i in range(n):
        cursor += isUnique[i]
        for j in range(m):
            uniqueValues[cursor-1, j] = sorted_mat[i, j]
        duplicateCounts[cursor-1] += 1

    return uniqueValues, duplicateCounts
Run Code Online (Sandbox Code Playgroud)

之前的np.unique调用可以替换为find_duplicates(sort_lines(packed_mat)).

第3步:基于GPUnp.unique

虽然使用 Numba 和 Numpy 在 CPU 上实现快速算法来排序行并不容易,但可以简单地使用 CuPy 在 GPU 上执行此操作,假设 Nvidia GPU 可用并且已安装 CUDA(以及 CuPy)。该解决方案的优点是简单且效率显着提高。这是一个例子:

import cupy as cp

def cupy_sort_lines(mat):
    cupy_mat = cp.array(mat)
    return cupy_mat[cp.lexsort(cupy_mat.T[::-1,:])].get()
Run Code Online (Sandbox Code Playgroud)

之前的sort_lines调用可以替换为cupy_sort_lines.


结果

以下是我的机器上的时序,该机器配备 6 核 i5-9600KF CPU 和 Nvidia 1660 Super GPU:

Initial version:        15.541 s
Optimized packing:       0.982 s
Optimized np.unique:     0.634 s
GPU-based sorting:       0.143 s   (require a Nvidia GPU)
Run Code Online (Sandbox Code Playgroud)

因此,基于 CPU 的优化版本大约快了 25 倍,基于 GPU 的优化版本快了 109 倍。请注意,在所有版本中排序都会花费大量时间。另外,请注意,解包不包含在基准测试中(如提供的代码中所示)。只要解压缩几行而不是所有完整数组,所需的时间可以忽略不计(在我的机器上大约需要 200 毫秒)。最后一个操作可以进一步优化,但代价是实现更加复杂。