比较数组补丁的最快方法是什么?

mij*_*ijc 19 python arrays performance numpy cython

我想比较一个二维数组$ A $的不同区域和一个较小尺寸的给定数组$ b $.由于我必须做很多次,因此执行速度非常快是至关重要的.我有一个解决方案,工作正常,但我希望它可以做得更好,更快.

详细地:

假设我们有一个大数组和一个小数组.我遍历大数组中与小数组大小相同的所有可能"补丁",并将这些补丁与给定的小数组进行比较.

def get_best_fit(big_array, small_array):

    # we assume the small array is square
    patch_size = small_array.shape[0]
    min_value = np.inf
    for x in range(patch_size, big_array.shape[0] - patch_size):
        for y in range(patch_size, big_array.shape[1] - patch_size):
            p = get_patch_term(x, y, patch_size, big_array)
            tmp = some_metric(p, small_array)
            if min_value > tmp:
                min_value = tmp
                min_patch = p

    return min_patch, min_value
Run Code Online (Sandbox Code Playgroud)

为了获得补丁,我得到了这个直接阵列访问实现:

def get_patch_term(x, y, patch_size, data):
    """
    a patch has the size (patch_size)^^2
    """
    patch = data[(x - (patch_size-1)/2): (x + (patch_size-1)/2 + 1),
                 (y - (patch_size-1)/2): (y + (patch_size-1)/2 + 1)]
    return patch
Run Code Online (Sandbox Code Playgroud)

我想这是最重要的任务,可以更快地执行,但我不确定.

我看了一下Cython,但也许我做错了,我对它并不熟悉.

我的第一次尝试是直接转换为cython:

def get_patch_term_fast(Py_ssize_t x_i, Py_ssize_t y_i,
                        Py_ssize_t patch_size,
                        np.ndarray[DTYPE_t, ndim=2] big_array):

    assert big_array.dtype == DTYPE
    patch_size = (patch_size - 1)/2

    cdef np.ndarray[DTYPE_t, ndim=2] patch = <np.ndarray[DTYPE_t, ndim=2]>big_array[(x_i - patch_size):(x_i + patch_size + 1), (y_i - patch_size): (y_i + patch_size + 1)]
    return patch
Run Code Online (Sandbox Code Playgroud)

这似乎更快(见下文),但我认为并行方法应该更好,所以我想出了这个

def get_patch_term_fast_parallel(Py_ssize_t x_i, Py_ssize_t y_i,
                                 Py_ssize_t patch_size,
                                 np.ndarray[DTYPE_t, ndim=2] big_array):

    assert big_array.dtype == DTYPE
    patch_size = (patch_size - 1)/2

    assert big_array.dtype == DTYPE
    cdef Py_ssize_t x
    cdef Py_ssize_t y


    cdef np.ndarray[DTYPE_t, ndim=1] patch = np.empty(np.power((2 * patch_size) + 1, 2))
    with nogil, parallel():
        for x in prange(x_i - patch_size, x_i + patch_size + 1):
            for y in prange(y_i - patch_size, y_i + patch_size + 1):
                patch[((x - (x_i - patch_size)) * (2 * patch_size + 1)) + (y - (y_i - patch_size))] = big_array[x, y]
    #cdef np.ndarray[DTYPE_t, ndim=2] patch = <np.ndarray[DTYPE_t, ndim=2]>big_array[(x_i - patch_size):(x_i + patch_size + 1), (y_i - patch_size): (y_i + patch_size + 1)]
    return patch
Run Code Online (Sandbox Code Playgroud)

不幸的是,这不是更快.为了测试我用过:

A = np.array(range(1200), dtype=np.float).reshape(30, 40)
b = np.array([41, 42, 81, 84]).reshape(2, 2)

x = 7
y = 7
print(timeit.timeit(lambda:get_patch_term_fast(x, y, 3, A), number=300))
print(timeit.timeit(lambda:get_patch_term_fast_parallel(x, y, 3, A).reshape(3,3), number=300))
print(timeit.timeit(lambda:get_patch_term(x, y, 3, A), number=300))
Run Code Online (Sandbox Code Playgroud)

这使

0.0008792859734967351
0.0029909340664744377
0.0029337930027395487
Run Code Online (Sandbox Code Playgroud)

所以,我的第一个问题是,是否可以更快地完成它?第二个问题是,为什么并行方法不比原来的numpy实现快?

编辑:

我试图进一步并行化get_best_fit函数,但遗憾的是我收到了很多错误,指出我不能在没有gil的情况下分配Python对象.

这是代码:

def get_best_fit_fast(np.ndarray[DTYPE_t, ndim=2] big_array,
                      np.ndarray[DTYPE_t, ndim=2] small_array):

    # we assume the small array is square
    cdef Py_ssize_t patch_size = small_array.shape[0]

    cdef Py_ssize_t x
    cdef Py_ssize_t y

    cdef Py_ssize_t x_range = big_array.shape[0] - patch_size
    cdef Py_ssize_t y_range = big_array.shape[1] - patch_size

    cdef np.ndarray[DTYPE_t, ndim=2] p
    cdef np.ndarray[DTYPE_t, ndim=2] weights = np.empty((x_range - patch_size)*(y_range - patch_size)).reshape((x_range - patch_size), (y_range - patch_size))

    with nogil, parallel():
        for x in prange(patch_size, x_range):
            for y in prange(patch_size, y_range):
                p = get_patch_term_fast(x, y, patch_size, big_array)
                weights[x - patch_size, y - patch_size] = np.linalg.norm(np.abs(p - small_array))

    return np.min(weights)
Run Code Online (Sandbox Code Playgroud)

PS:我省略了返回最小补丁的部分......

Nik*_* M. 0

初始发布了另一个基于模式匹配的答案(被标题带走),发布了另一个答案

使用一个循环遍历两个数组(大数组和小数组)并应用部分相关度量(或其他),而无需始终对列表进行切片:

作为旁注,请观察这样一个事实(当然取决于指标):一旦一个补丁完成,下一个补丁(向下一行或向右一列)与前一个补丁共享很多内容,仅其初始行和最后行(或因此,通过减去前一行并添加新行(或反之亦然),可以从先前的相关性中更快地计算新的相关性。由于未给出度量函数,因此添加了观察值。

def get_best_fit(big_array, small_array):

    # we assume the small array is square
    patch_size = small_array.shape[0]
    x = 0 
    y = 0
    x2 = 0 
    y2 = 0
    W = big_array.shape[0]
    H = big_array.shape[1]
    W2 = patch_size
    H2 = patch_size
    min_value = np.inf
    tmp = 0
    min_patch = None
    start = 0 
    end = H*W
    start2 = 0
    end2 = W2*H2
    while start < end:

        tmp = 0
        start2 = 0
        x2 = 0 
        y2 = 0
        valid = True
        while start2 < end2:
            if x+x2 >= W or y+y2 >= H: 
                valid = False
                break
            # !!compute partial metric!!
            # no need to slice lists all the time
            tmp += some_metric_partial(x, y, x2, y2, big_array, small_array)
            x2 += 1 
            if x2>=W2: 
                x2 = 0 
                y2 += 1
            start2 += 1

        # one patch covered
        # try next patch
        if valid and min_value > tmp:
            min_value = tmp
            min_patch = [x, y, W2, H2]

        x += 1 
        if x>=W: 
            x = 0 
            y += 1

        start += 1

    return min_patch, min_value
Run Code Online (Sandbox Code Playgroud)