Python:如何使这种颜色阈值函数更有效

Ale*_*lex 5 python performance opencv numpy image-processing

我在Python中编写了一个自适应颜色阈值函数(因为OpenCV的cv2.adaptiveThreshold不符合我的需要)而且它太慢了.我已尽可能高效,但在1280x720图像上仍需要近500毫秒.

我将非常感谢任何能使这项功能更有效的建议!

这是函数的作用:它使用一个像素厚度的十字形作为结构元素.对于图像中的每个像素,它计算的平均值ksize相邻的像素在四个方向上独立地(即,平均的ksize同一行中的像素的左侧,在同一列的上方,在同一行的右边中,并在下面的同一栏).我以四个平均值结束,每个方向一个.如果像素比左右平均值或者顶部和底部平均值(加上一些常数C)更亮,则该像素仅满足阈值标准.

我使用同时为所有像素逐步计算这些平均值numpy.roll(),但我仍然需要这样做ksize.在ksize通常是20-50.

这是代码,相关部分实际上就是for循环中发生的事情:

def bilateral_adaptive_threshold(img, ksize=20, C=0, mode='floor', true_value=255, false_value=0):

    mask = np.full(img.shape, false_value, dtype=np.int16)

    left_thresh = np.zeros_like(img, dtype=np.float32) #Store the right-side average of each pixel here
    right_thresh = np.zeros_like(img, dtype=np.float32) #Store the left-side average of each pixel here
    up_thresh = np.zeros_like(img, dtype=np.float32) #Store the top-side average of each pixel here
    down_thresh = np.zeros_like(img, dtype=np.float32) #Store the bottom-side average of each pixel here

    for i in range(1, ksize+1): 
        roll_left = np.roll(img, -i, axis=1)
        roll_right = np.roll(img, i, axis=1)
        roll_up = np.roll(img, -i, axis=0)
        roll_down = np.roll(img, i, axis=0)

        roll_left[:,-i:] = 0
        roll_right[:,:i] = 0
        roll_up[-i:,:] = 0
        roll_down[:i,:] = 0

        left_thresh += roll_right
        right_thresh += roll_left
        up_thresh += roll_down
        down_thresh += roll_up

    left_thresh /= ksize
    right_thresh /= ksize
    up_thresh /= ksize
    down_thresh /= ksize

    if mode == 'floor':
        mask[((img > left_thresh+C) & (img > right_thresh+C)) | ((img > up_thresh+C) & (img > down_thresh+C))] = true_value
    elif mode == 'ceil':
        mask[((img < left_thresh-C) & (img < right_thresh-C)) | ((img < up_thresh-C) & (img < down_thresh-C))] = true_value
    else: raise ValueError("Unexpected mode value. Expected value is 'floor' or 'ceil'.")

    return mask
Run Code Online (Sandbox Code Playgroud)

Dan*_*šek 7

当你提示你的问题时,函数的主要部分是获得计算平均值所需的4个数组 - 这里整个函数的平均值为210毫秒.那么,让我们关注它.

首先,必要的进口和便利计时功能.

from timeit import default_timer as timer
import numpy as np
import cv2

## ===========================================================================

def time_fn(fn, img, ksize=20, iters=16):
    start = timer()
    for i in range(iters):
        fn(img, ksize)
    end = timer()
    return ((end - start) / iters) * 1000

## ===========================================================================
# Our test image
img = np.uint8(np.random.random((720,1280)) * 256)
Run Code Online (Sandbox Code Playgroud)

原始实施

我们可以通过以下方式减少您的功能,以便它只计算并返回4个和数组.我们以后可以使用它来检查优化版本是否返回相同的结果.

# Original code
def windowed_sum_v1(img, ksize=20):
    left_thresh = np.zeros_like(img, dtype=np.float32)
    right_thresh = np.zeros_like(img, dtype=np.float32)
    up_thresh = np.zeros_like(img, dtype=np.float32)
    down_thresh = np.zeros_like(img, dtype=np.float32)

    for i in range(1, ksize+1): 
        roll_left = np.roll(img, -i, axis=1)
        roll_right = np.roll(img, i, axis=1)
        roll_up = np.roll(img, -i, axis=0)
        roll_down = np.roll(img, i, axis=0)

        roll_left[:,-i:] = 0
        roll_right[:,:i] = 0
        roll_up[-i:,:] = 0
        roll_down[:i,:] = 0

        left_thresh += roll_right
        right_thresh += roll_left
        up_thresh += roll_down
        down_thresh += roll_up

    return (left_thresh, right_thresh, up_thresh, down_thresh)
Run Code Online (Sandbox Code Playgroud)

现在我们可以找到此函数在本地计算机上花费的时间:

>>> print "V1: %f ms" % time_fn(windowed_sum_v1, img, 20, 16)
V1: 188.572077 ms
Run Code Online (Sandbox Code Playgroud)

改进#1

numpy.roll必然会涉及一些开销,但这里没有必要深入研究.请注意,在滚动数组之后,会将遍布数组边缘的行或列清零.然后将其添加到累加器.添加零不会改变结果,因此我们也可以避免这种情况.相反,我们可以添加整个阵列的渐进的较小且适当偏移的切片,从而避免roll和(稍微)减少所需的添加总数.

# Summing up ROIs
def windowed_sum_v2(img, ksize=20):
    h,w=(img.shape[0], img.shape[1])

    left_thresh = np.zeros_like(img, dtype=np.float32)
    right_thresh = np.zeros_like(img, dtype=np.float32)
    up_thresh = np.zeros_like(img, dtype=np.float32)
    down_thresh = np.zeros_like(img, dtype=np.float32)

    for i in range(1, ksize+1): 
        left_thresh[:,i:] += img[:,:w-i]
        right_thresh[:,:w-i] += img[:,i:]
        up_thresh[i:,:] += img[:h-i,:]
        down_thresh[:h-i,:] += img[i:,:]

    return (left_thresh, right_thresh, up_thresh, down_thresh)
Run Code Online (Sandbox Code Playgroud)

让我们测试一下并计算时间:

>>> print "Results equal (V1 vs V2): %s" % (np.array_equal(windowed_sum_v1(img), windowed_sum_v2(img)))
Results equal (V1 vs V2): True
>>> print "V2: %f ms" % time_fn(windowed_sum_v2, img, 20, 16)
V2: 110.861794 ms
Run Code Online (Sandbox Code Playgroud)

此实现仅占原始时间的60%.我们可以做得更好吗?

改进#2

我们还有一个循环.如果我们可以通过对一些优化函数的单次调用来替换重复添加,那将是很好的.一个这样的功能是cv2.filter2D,它计算如下:

filter2D

我们可以创建一个内核,这样我们想要添加1.0的点的权重和内核所锚定的点的权重为0.0.

例如,何时ksize=8,我们可以使用以下内核和锚位置.

ksize的内核= 8

该功能如下:

# Using filter2d
def windowed_sum_v3(img, ksize=20):
    kernel_l = np.array([[1.0] * (ksize) + [0.0]])
    kernel_r = np.array([[0.0] + [1.0] * (ksize)])
    kernel_u = np.array([[1.0]] * (ksize) + [[0.0]])
    kernel_d = np.array([[0.0]] + [[1.0]] * (ksize))

    left_thresh = cv2.filter2D(img, cv2.CV_32F, kernel_l, anchor=(ksize,0), borderType=cv2.BORDER_CONSTANT)
    right_thresh = cv2.filter2D(img, cv2.CV_32F, kernel_r, anchor=(0,0), borderType=cv2.BORDER_CONSTANT)
    up_thresh = cv2.filter2D(img, cv2.CV_32F, kernel_u, anchor=(0,ksize), borderType=cv2.BORDER_CONSTANT)
    down_thresh = cv2.filter2D(img, cv2.CV_32F, kernel_d, anchor=(0,0), borderType=cv2.BORDER_CONSTANT)

    return (left_thresh, right_thresh, up_thresh, down_thresh)
Run Code Online (Sandbox Code Playgroud)

再次,让我们测试一下这个函数的时间:

>>> print "Results equal (V1 vs V3): %s" % (np.array_equal(windowed_sum_v1(img), windowed_sum_v3(img)))
Results equal (V1 vs V3): True
>>> print "V2: %f ms" % time_fn(windowed_sum_v3, img, 20, 16)
V3: 46.652996 ms
Run Code Online (Sandbox Code Playgroud)

我们降到原来时间的25%.

改进#3

我们正在浮点工作,但是现在我们不进行任何划分,内核只包含1和0.这意味着我们可以使用整数.你提到最大窗口大小是50,这意味着我们使用16位有符号整数是安全的.整数数学往往更快,如果我们使用的代码被正确地矢量化,我们可以一次处理两次.让我们给出一个镜头,让我们还提供一个包装器,它以浮点格式返回结果,与之前的版本一样.

# Integer only
def windowed_sum_v4(img, ksize=20):
    kernel_l = np.array([[1] * (ksize) + [0]], dtype=np.int16)
    kernel_r = np.array([[0] + [1] * (ksize)], dtype=np.int16)
    kernel_u = np.array([[1]] * (ksize) + [[0]], dtype=np.int16)
    kernel_d = np.array([[0]] + [[1]] * (ksize), dtype=np.int16)

    left_thresh = cv2.filter2D(img, cv2.CV_16S, kernel_l, anchor=(ksize,0), borderType=cv2.BORDER_CONSTANT)
    right_thresh = cv2.filter2D(img, cv2.CV_16S, kernel_r, anchor=(0,0), borderType=cv2.BORDER_CONSTANT)
    up_thresh = cv2.filter2D(img, cv2.CV_16S, kernel_u, anchor=(0,ksize), borderType=cv2.BORDER_CONSTANT)
    down_thresh = cv2.filter2D(img, cv2.CV_16S, kernel_d, anchor=(0,0), borderType=cv2.BORDER_CONSTANT)

    return (left_thresh, right_thresh, up_thresh, down_thresh)

# Integer only, but returning floats    
def windowed_sum_v5(img, ksize=20):
    result = windowed_sum_v4(img, ksize)
    return map(np.float32,result)
Run Code Online (Sandbox Code Playgroud)

我们来试试吧.

>>> print "Results equal (V1 vs V4): %s" % (np.array_equal(windowed_sum_v1(img), windowed_sum_v4(img)))
Results equal (V1 vs V4): True
>>> print "Results equal (V1 vs V5): %s" % (np.array_equal(windowed_sum_v1(img), windowed_sum_v5(img)))
Results equal (V1 vs V5): True
>>> print "V4: %f ms" % time_fn(windowed_sum_v4, img, 20, 16)
V4: 14.712223 ms
>>> print "V5: %f ms" % time_fn(windowed_sum_v5, img, 20, 16)
V5: 20.859744 ms
Run Code Online (Sandbox Code Playgroud)

如果我们对16位整数很好,那么我们会降到7%,如果我们想要浮点数,我们会降到10%.

进一步改进

让我们回到您编写的完整阈值函数.我们可以扩展内核,filter2D直接返回均值,而不是将总和除以单独的步骤来获得平均值.这只是一个很小的改进(~3%).

同样,您可以C通过提供适合deltafilter2D呼叫来替换加法或减法.这再次削减了几个百分点.

注意:如果实施上述两个更改,由于浮点表示的限制,可能会遇到很少的差异.

另一种可能性是将确定掩模所需的比较作为矩阵与标量的比较:

input < threshold
input - input < threshold - input
0 < threshold - input
0 < adjusted_threshold            # determined using adjusted kernel
Run Code Online (Sandbox Code Playgroud)

我们可以通过修改内核来减去由适当的权重(ksize)缩放的锚像素的值来实现这一点.有了numpy,这似乎只有微小的差别,虽然我理解它的方式,我们可以节省一半的算法的那部分读数(虽然filter2D可能仍然读取并乘以相应的值,即使权重为0).

最快的阈值功能实现

考虑到所有这些,我们可以像这样重写你的函数,并在原始的~12.5%时间内获得相同的结果:

def bilateral_adaptive_threshold5(img, ksize=20, C=0, mode='floor', true_value=255, false_value=0):
    mask = np.full(img.shape, false_value, dtype=np.uint8)

    kernel_l = np.array([[1] * (ksize) + [-ksize]], dtype=np.int16)
    kernel_r = np.array([[-ksize] + [1] * (ksize)], dtype=np.int16)
    kernel_u = np.array([[1]] * (ksize) + [[-ksize]], dtype=np.int16)
    kernel_d = np.array([[-ksize]] + [[1]] * (ksize), dtype=np.int16)

    if mode == 'floor':
        delta = C * ksize
    elif mode == 'ceil':
        delta = -C * ksize
    else: raise ValueError("Unexpected mode value. Expected value is 'floor' or 'ceil'.")

    left_thresh = cv2.filter2D(img, cv2.CV_16S, kernel_l, anchor=(ksize,0), delta=delta, borderType=cv2.BORDER_CONSTANT)
    right_thresh = cv2.filter2D(img, cv2.CV_16S, kernel_r, anchor=(0,0), delta=delta, borderType=cv2.BORDER_CONSTANT)
    up_thresh = cv2.filter2D(img, cv2.CV_16S, kernel_u, anchor=(0,ksize), delta=delta, borderType=cv2.BORDER_CONSTANT)
    down_thresh = cv2.filter2D(img, cv2.CV_16S, kernel_d, anchor=(0,0), delta=delta, borderType=cv2.BORDER_CONSTANT)

    if mode == 'floor':
        mask[((0 > left_thresh) & (0 > right_thresh)) | ((0 > up_thresh) & (0 > down_thresh))] = true_value
    elif mode == 'ceil':
        mask[((0 < left_thresh) & (0 < right_thresh)) | ((0 < up_thresh) & (0 < down_thresh))] = true_value

    return mask
Run Code Online (Sandbox Code Playgroud)

  • 谢谢你这本教科书质量的答案,太神奇了!但有一件事:我不太清楚上面定义的内核如何正确地执行缩放.假设您将内核卷积(关联)在所有具有相同值的像素块上,即"a".然后`kernel_l`会将锚像素的结果计算为`ksize*a +(-ksize)*a = 0`,即使平均值应为'a`.它必须是`kernel_l = np.array([[1]*(ksize)+ [-ksize + 1]],dtype = np.int16)`(注意末尾的`+ 1`),但是那么它只是一般情况的真实平均值的近似值? (2认同)
  • @Alex它不再仅仅计算平均值,它还会减去输入,以便我们以后可以与标量0进行比较.因此,在您的示例中,0是预期结果.我通过`ksize`常量来扩展所有内容,以消除除法的需要,允许我们只使用整数. (2认同)
  • 哦,是的,我现在看到了:将平均值与锚值进行比较相当于将总和与'ksize`乘以锚值进行比较,并且比较这两个值相当于将它们的差值与零进行比较.简单的算术,我的坏.谢谢! (2认同)
  • 我注意到一个细节:在最后的实现中,`C`需要通过`ksize`重新调整,以使计算等同于原始函数. (2认同)