加速寻找最佳分割线

Rap*_*ael 8 python algorithm optimization performance

这个编码问题源自这个问题

考虑一个 n × n 的整数网格。任务是在网格上画一条直线,使包含左上角的部分之和达到尽可能大的数字。这是得分为 45 的最佳解决方案的图片:

在此输入图像描述

如果其中间位于线上或线上,则我们在要求和的部分中包含一个正方形。上面是指包括网格左上角的部分。(为了使这个定义更清楚,任何线都不能完全从网格的左上角开始。)

任务是选择一条线,使包含左上角正方形的部分的总和最大化。该线必须从一侧直线到另一侧。该线可以在一侧的任何位置开始或结束,而不仅仅是在整数点处。

给出的Python代码是:

import numpy as np
import fractions

def best_line(grid):
    n, m = grid.shape
    D = [(di, dj) for di in range(-(n - 1), n) for dj in range(-(n - 1), n)]
    def slope(d):
        di, dj = d
        if dj == 0:
            return float('inf') if di <= 0 else float('-inf'), -di
        else:
            return fractions.Fraction(di, dj), fractions.Fraction(-1, dj)
    D.sort(key=slope)
    D = np.array(D, dtype=np.int64)
    s_max = grid.sum()
    for grid in (grid, grid.T):
        left_sum = 0
        for j in range(grid.shape[1]):
            left_sum += grid[:,j].sum()
            for i in range(grid.shape[0]):
                p = np.array([i, j], dtype=np.int64)
                Q = p + D
                Q = Q[np.all((0 <= Q) & (Q < np.array(grid.shape)), axis=1)]
                s = left_sum 
                for q in Q:
                    if not np.any(q):
                        break
                    if q[1] <= j:
                        s -= grid[q[0],q[1]]
                    else:
                        s += grid[q[0],q[1]]
                    s_max = max(s_max, s)
    return s_max
Run Code Online (Sandbox Code Playgroud)

对于 n=30,此代码已经很慢。

在实际操作中,有什么办法可以加快速度吗?

测试用例

由于问题相当复杂,我给出了一些输入和输出示例。

最简单的测试用例是当输入矩阵仅由正(或负)整数组成时。在这种情况下,使部分与整个矩阵(如果所有整数均为负数,则为空矩阵)求和的行获胜。

稍微不太简单的是,是否有一条线将矩阵中的负整数与非负整数明确分开。

这是一个稍微困难一点的示例,其中显示了一条最佳线。最佳值为 14。

在此输入图像描述

机器可读形式的网格是:

[[ 3 -1 -2 -1]
 [ 0  1 -1  1]
 [ 1  1  3  0]
 [ 3  3 -1 -1]]
Run Code Online (Sandbox Code Playgroud)

这是最佳值 0 的示例。

在此输入图像描述

[[-3 -3  2 -3]
 [ 0 -2 -1  0]
 [ 1  0  2  0]
 [-1 -2  1 -1]]
Run Code Online (Sandbox Code Playgroud)

该矩阵的最佳得分为 31:

[[ 3  0  1  3 -1  1  1  3 -2 -1]
 [ 3 -1 -1  1  0 -1  2  1 -2  0]
 [ 2  2 -2  0  1 -3  0 -2  2  1]
 [ 0 -3 -3 -1 -1  3 -2  0  0  3]
 [ 2  2  3  2 -1  0  3  0 -3 -1]
 [ 1 -1  3  1 -3  3 -2  0 -3  0]
 [ 2 -2 -2 -3 -2  1 -2  0  0  3]
 [ 0  3  0  1  3 -1  2 -3  0 -2]
 [ 0 -2  2  2  2 -2  0  2  1  3]
 [-2 -2  0 -2 -2  2  0  2  3  3]]
Run Code Online (Sandbox Code Playgroud)

在 Python/numpy 中,制作更多测试矩阵的简单方法是:

import numpy as np
N = 30
square = np.random.randint(-3, 4, size=(N, N))
Run Code Online (Sandbox Code Playgroud)

定时

N = 30
np.random.seed(42)
big_square = randint(-3, 4, size=(N, N))
print(best_line(np.array(big_square)))
Run Code Online (Sandbox Code Playgroud)

需要 1 分 55 秒,输出为 57。

  • 对于 n=250,Andrej Kesely 的并行代码需要 1 分 5 秒。这是一个巨大的进步。

它还能变得更快吗?

Jér*_*ard 6

TL;DR:这个答案提供了比@AndrejKesely 更快的解决方案。也使用了 Numba,但最终的代码更加优化,尽管也更复杂。


第一个简单的实现

初始代码效率不高,因为它在循环中调用 Numpy 函数。在这种情况下,Numpy 函数非常昂贵。Numba 和 Cython是使代码显着更快的方法。

尽管如此,Nx2 数组上的操作效率并不高,无论是在 Numpy 中还是在 Numba 中。此外,生成临时数组(如Q)往往不是最佳的。解决方案通常是动态计算数组DQ而不生成临时数组。

这是一个简单的相对快速的实现,我们可以在此基础上编写:

import numpy as np
import numba as nb

# Naive implementation based on the initial code: see below
def generate_D(n):
    import fractions
    def slope(d):
        di, dj = d
        if dj == 0:
            return float('inf') if di <= 0 else float('-inf'), -di
        return fractions.Fraction(di, dj), fractions.Fraction(-1, dj)
    D = [(di, dj) for di in range(-(n - 1), n) for dj in range(-(n - 1), n)]
    D.sort(key=slope)
    return np.array(D, dtype=np.int64)

# Naive Numba implementation: see below
@nb.njit('(int64[:,::1], int64[:,::1], int64)')
def best_line_main_loop_seq(grid, D, s_max_so_far):
    n, m = grid.shape
    s_max = s_max_so_far
    left_sum = 0
    for j in range(m):
        left_sum += grid[:,j].sum()
        for i in range(n):
            s = left_sum
            for k in range(D.shape[0]):
                qi = D[k, 0] + i
                qj = D[k, 1] + j
                if 0 <= qi and qi < n and 0 <= qj and qj < m:
                    if qi == 0 and qj == 0:
                        break
                    val = grid[qi,qj]
                    s += -val if qj <= j else val
                    s_max = max(s_max, s)
    return s_max

# Main computing function
def best_line(grid):
    n, m = grid.shape
    D = generate_D(n)
    s_max = grid.sum()
    s_max = max(s_max, best_line_main_loop_par_unroll_transposed(grid.T.copy(), D, s_max))
    s_max = max(s_max, best_line_main_loop_par_unroll_transposed(grid, D, s_max))
    return s_max

# Benchmark
N = 30
np.random.seed(42)
big_square = np.random.randint(-3, 4, size=(N, N))
grid = np.array(big_square).astype(np.int64)
assert best_line(grid) == 57
%time best_line(grid)
Run Code Online (Sandbox Code Playgroud)

此顺序代码的性能与 @AndrejKesely 的并行实现之一相差不远 (在我的机器上有点慢,尤其是在较大的情况下N)。


更快地生成数组D

generate_D(n)由于模块效率相当低fractions,上述代码受到排序缓慢的限制,特别是对于小N值。通过直接比较 Numba 中的分数可以使代码更快。然而,不幸的是,Numba 不支持该sort函数的关键字,因此需要重新实现该功能。手动执行此操作有点麻烦,但速度的提高值得付出努力。这是最终的实现:

@nb.njit('(UniTuple(int64,2), UniTuple(int64,2))')
def compare_fraction(a, b):
    a_top, a_bot = a
    b_top, b_bot = b
    if a_bot < 0:
        a_top = -a_top
        a_bot = -a_bot
    if b_bot < 0:
        b_top = -b_top
        b_bot = -b_bot
    fixed_a = a_top * b_bot
    fixed_b = b_top * a_bot
    if fixed_a < fixed_b:
        return -1
    if fixed_a == fixed_b:
        return 0
    return 1

@nb.njit('(int64[::1], int64[::1])')
def compare_slope(a, b):
    ai, aj = a
    bi, bj = b
    if aj == 0: # slope_a is special
        if ai <= 0: # slope_a = (INF,-ai)
            if bj == 0 and bi <= 0:
                if -ai < -bi: return -1
                elif -ai == -bi: return 0
                else: return 1
            else:
                return 1
        else: # slope_a = (-INF,-ai)
            if bj == 0 and bi > 0: # slope_b = (-INF,-bi)
                if -ai < -bi: return -1
                elif -ai == -bi: return 0
                else: return 1
            else:
                return -1
    else:
        if bj == 0: # slope_b is special
            if bi <= 0: # slope_b = (INF,-bi)
                return -1
            else: # slope_b = (-INF,-bi)
                return 1
        slope_a = ((ai,aj), (-1,aj))
        slope_b = ((bi,bj), (-1,bj))
        res = compare_fraction(slope_a[0], slope_b[0])
        if res == 0:
            return compare_fraction(slope_a[1], slope_b[1])
        return res

# Quite naive quick-sort, but simple one
@nb.njit('(int64[:,::1],)')
def sort_D(arr):
    if len(arr) <= 1:
        return
    else:
        pivot = arr[0].copy()
        left = 1
        right = len(arr) - 1
        while True:
            while left <= right and compare_slope(arr[left], pivot) <= 0:
                left = left + 1
            while compare_slope(arr[right], pivot) >= 0 and right >= left:
                right = right - 1
            if right < left:
                break
            else:
                arr[left], arr[right] = arr[right].copy(), arr[left].copy()
        arr[0], arr[right] = arr[right].copy(), arr[0].copy()
        sort_D(arr[:right])
        sort_D(arr[right+1:])

@nb.njit('(int64,)')
def generate_D(n):
    D_list = [(di, dj) for di in range(-(n - 1), n) for dj in range(-(n - 1), n)]
    D_arr = np.array(D_list, dtype=np.int64)
    sort_D(D_arr)
    return D_arr
Run Code Online (Sandbox Code Playgroud)

上述功能在我的机器上快了10倍generate_D以上。


更快的主循环

上面提供的简单主循环代码可以改进。

首先,它可以并行化,尽管代码不能很好地扩展(当然是由于break循环中引起的工作不平衡)。这可以在外prange循环上有效地完成(在内循环上使用并不是最佳选择,因为工作量很小并且创建/连接线程很昂贵)。prange

此外,可以展开循环以减少条件检查的数量,尤其是在j. 虽然性能改进可能很显着(例如,速度提高了 2 倍),但生成的代码显然也不太可读/可维护(实际上相当难看)。这是一个需要付出的代价,因为该操作对于 Numba JIT(更普遍的是大多数编译器)来说太复杂了。

最后,通过虚拟转置数组,可以使数组访问更加缓存友好,从而提高内存访问的局部性(即访问目标网格中同一行的许多项目,而不是访问不同的行)。N此优化对于较大值(例如)特别有用>200

这是优化后的主要代码:

@nb.njit('(int64[:,::1], int64[:,::1], int64)', parallel=True, cache=True)
def best_line_main_loop_par_unroll_transposed(grid, D, s_max_so_far):
    m, n = grid.shape
    s_max = s_max_so_far
    all_left_sum = np.empty(m, dtype=np.int64)
    left_sum = 0
    for j in range(m):
        left_sum += grid[j,:].sum()
        all_left_sum[j] = left_sum
    for j in nb.prange(m):
        left_sum = all_left_sum[j]
        for i in range(0, n//4*4, 4):
            i1 = i
            i2 = i + 1
            i3 = i + 2
            i4 = i + 3
            s1 = left_sum
            s2 = left_sum
            s3 = left_sum
            s4 = left_sum
            continue_loop_i1 = True
            continue_loop_i2 = True
            continue_loop_i3 = True
            continue_loop_i4 = True
            for k in range(D.shape[0]):
                qj = D[k, 1] + j
                if 0 <= qj and qj < m:
                    qi1 = D[k, 0] + i1
                    qi2 = D[k, 0] + i2
                    qi3 = D[k, 0] + i3
                    qi4 = D[k, 0] + i4
                    mult = np.int64(-1 if qj <= j else 1)
                    if qj != 0:
                        if continue_loop_i1 and 0 <= qi1 and qi1 < n:  s1 += mult * grid[qj,qi1]
                        if continue_loop_i2 and 0 <= qi2 and qi2 < n:  s2 += mult * grid[qj,qi2]
                        if continue_loop_i3 and 0 <= qi3 and qi3 < n:  s3 += mult * grid[qj,qi3]
                        if continue_loop_i4 and 0 <= qi4 and qi4 < n:  s4 += mult * grid[qj,qi4]
                        s_max = max(s_max, max(max(s1, s2), max(s3, s4)))
                    else:
                        if continue_loop_i1 and 0 <= qi1 and qi1 < n:
                            if qi1 == 0 and qj == 0:
                                continue_loop_i1 = False
                            else:
                                s1 += mult * grid[qj,qi1]
                                s_max = max(s_max, s1)
                        if continue_loop_i2 and 0 <= qi2 and qi2 < n:
                            if qi2 == 0 and qj == 0:
                                continue_loop_i2 = False
                            else:
                                s2 += mult * grid[qj,qi2]
                                s_max = max(s_max, s2)
                        if continue_loop_i3 and 0 <= qi3 and qi3 < n:
                            if qi3 == 0 and qj == 0:
                                continue_loop_i3 = False
                            else:
                                s3 += mult * grid[qj,qi3]
                                s_max = max(s_max, s3)
                        if continue_loop_i4 and 0 <= qi4 and qi4 < n:
                            if qi4 == 0 and qj == 0:
                                continue_loop_i4 = False
                            else:
                                s4 += mult * grid[qj,qi4]
                                s_max = max(s_max, s4)
                        if not continue_loop_i1 and not continue_loop_i2 and not continue_loop_i3 and not continue_loop_i4:
                            break
        for i in range(n//4*4, n):
            s = left_sum
            for k in range(D.shape[0]):
                qi = D[k, 0] + i
                qj = D[k, 1] + j
                mult = np.int64(-1 if qj <= j else 1)
                if 0 <= qi and qi < n and 0 <= qj and qj < m:
                    if qi == 0 and qj == 0:
                        break
                    s += mult * grid[qj,qi]
                    s_max = max(s_max, s)
    return s_max

def best_line(grid):
    n, m = grid.shape
    D = generate_D(n)
    s_max = grid.sum()
    s_max = max(s_max, best_line_main_loop_par_unroll_transposed(grid.T.copy(), D, s_max))
    s_max = max(s_max, best_line_main_loop_par_unroll_transposed(grid, D, s_max))
    return s_max
Run Code Online (Sandbox Code Playgroud)

请注意,Numba 需要花费大量时间来编译此函数,因此需要cache=True标记以避免一遍又一遍地重新编译它。


绩效结果

以下是我的机器上的性能结果(使用 i5-9600KF CPU、Windows 上的 CPython 3.8.1 和 Numba 0.58.1):

With N = 30:
    - Initial code:             6461 ms
    - AndrejKesely's code:        54 ms
    - This code:                   4 ms     <-----

With N = 300:
    - Initial code:            TOO LONG
    - AndrejKesely's code:        109 s
    - This code:                   12 s     <-----
Run Code Online (Sandbox Code Playgroud)

因此,这个实现比最初的实现快了 1600 多倍,也比 @AndrejKesely 的实现快了 9 倍。这是速度最快的一次。


笔记

理论上,由于 SIMD 指令,所提供的实现可以进一步优化。然而,据我所知,使用 Numba 不可能轻松做到这一点。需要使用本机语言来做到这一点(例如 C、C++、Rust)。SIMD 友好的本地语言(例如 CUDA、ISPC)无疑是轻松实现这一目标的方法。事实上,使用本机 SIMD 内在函数或 SIMD 库手动执行此操作非常麻烦,并且可能会使代码完全不可读。我预计速度会快 2 到 4 倍,但肯定不会快很多。在CPU 上,这需要硬件支持快速SIMD 屏蔽加载指令和混合/屏蔽指令(例如最近的主流AMD/Intel x86-64 CPU)。在 GPU 上,需要关心保持 SIMD 通道主要处于活动状态(不是那么简单,因为 GPU 具有非常宽的 SIMD 寄存器,并且由于 和break条件而导致扭曲发散往往会增加),更不用说内存访问应该在没有存储体的情况下尽可能连续冲突以获得快速实现(尽管在这里这可能远非易事)。