如何在C#中快速从另一个中减去一个ushort数组?

nb1*_*rxp 5 c# arrays performance ushort subtraction

我需要从ushort arrayB中具有相同长度的相应索引值中快速减去ushort arrayA中的每个值.

另外,如果差异为负,我需要存储零,而不是负差.

(确切地说,长度= 327680,因为我从另一个相同大小的图像中减去640x512图像).

下面的代码目前需要大约20ms,如果可能的话,我想在~5ms内将其降低.不安全的代码是可以的,但请提供一个例子,因为我不擅长编写不安全的代码.

谢谢!

public ushort[] Buffer { get; set; }

public void SubtractBackgroundFromBuffer(ushort[] backgroundBuffer)
{
    System.Diagnostics.Stopwatch sw = new System.Diagnostics.Stopwatch();
    sw.Start();

    int bufferLength = Buffer.Length;

    for (int index = 0; index < bufferLength; index++)
    {
        int difference = Buffer[index] - backgroundBuffer[index];

        if (difference >= 0)
            Buffer[index] = (ushort)difference;
        else
            Buffer[index] = 0;
    }

    Debug.WriteLine("SubtractBackgroundFromBuffer(ms): " + sw.Elapsed.TotalMilliseconds.ToString("N2"));
}
Run Code Online (Sandbox Code Playgroud)

更新:虽然它不是严格意义上的C#,为了其他人的利益,我终于最终使用以下代码将C++ CLR类库添加到我的解决方案中.它运行在~3.1ms.如果使用非托管C++库,则运行时间约为2.2毫秒.由于时差很小,我决定使用托管库.

// SpeedCode.h
#pragma once
using namespace System;

namespace SpeedCode
{
    public ref class SpeedClass
    {
        public:
            static void SpeedSubtractBackgroundFromBuffer(array<UInt16> ^ buffer, array<UInt16> ^ backgroundBuffer, int bufferLength);
    };
}

// SpeedCode.cpp
// This is the main DLL file.
#include "stdafx.h"
#include "SpeedCode.h"

namespace SpeedCode
{
    void SpeedClass::SpeedSubtractBackgroundFromBuffer(array<UInt16> ^ buffer, array<UInt16> ^ backgroundBuffer, int bufferLength)
    {
        for (int index = 0; index < bufferLength; index++)
        {
            buffer[index] = (UInt16)((buffer[index] - backgroundBuffer[index]) * (buffer[index] > backgroundBuffer[index]));
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

然后我称之为:

    public void SubtractBackgroundFromBuffer(ushort[] backgroundBuffer)
    {
        System.Diagnostics.Stopwatch sw = new System.Diagnostics.Stopwatch();
        sw.Start();

        SpeedCode.SpeedClass.SpeedSubtractBackgroundFromBuffer(Buffer, backgroundBuffer, Buffer.Length);

        Debug.WriteLine("SubtractBackgroundFromBuffer(ms): " + sw.Elapsed.TotalMilliseconds.ToString("N2"));
    }
Run Code Online (Sandbox Code Playgroud)

nic*_*k_w 5

一些基准。

  1. SubtractBackgroundFromBuffer: 这是问题的原始方法。
  2. SubtractBackgroundFromBufferWithCalcOpt: 这是用TTat提高计算速度的想法增强的原始方法。
  3. SubtractBackgroundFromBufferParallelFor: Selman22's answer的解决方案。
  4. SubtractBackgroundFromBufferBlockParallelFor:我的答案。与 3. 类似,但将处理分成 4096 个值的块。
  5. SubtractBackgroundFromBufferPartitionedParallelForEach: 杰夫的第一个回答。
  6. SubtractBackgroundFromBufferPartitionedParallelForEachHack: 杰夫的第二个答案。

更新

有趣的是,我可以SubtractBackgroundFromBufferBlockParallelFor通过使用(如布鲁诺·科斯塔(Bruno Costa)建议的)获得小幅速度提升(~6% )

Buffer[i] = (ushort)Math.Max(difference, 0);
Run Code Online (Sandbox Code Playgroud)

代替

if (difference >= 0)
    Buffer[i] = (ushort)difference;
else
    Buffer[i] = 0;
Run Code Online (Sandbox Code Playgroud)

结果

请注意,这是每次运行 1000 次迭代的总时间。

SubtractBackgroundFromBuffer(ms):                                 2,062.23 
SubtractBackgroundFromBufferWithCalcOpt(ms):                      2,245.42
SubtractBackgroundFromBufferParallelFor(ms):                      4,021.58
SubtractBackgroundFromBufferBlockParallelFor(ms):                   769.74
SubtractBackgroundFromBufferPartitionedParallelForEach(ms):         827.48
SubtractBackgroundFromBufferPartitionedParallelForEachHack(ms):     539.60
Run Code Online (Sandbox Code Playgroud)

因此,从这些结果似乎是最好的方法结合了计算优化的一个小的收获和利用的Parallel.For对图像的块进行操作。您的里程当然会有所不同,并行代码的性能对您正在运行的 CPU 很敏感。

测试线束

我在发布模式下为每个方法运行了这个。我以Stopwatch这种方式启动和停止以确保只测量处理时间。

System.Diagnostics.Stopwatch sw = new System.Diagnostics.Stopwatch();
ushort[] bgImg = GenerateRandomBuffer(327680, 818687447);

for (int i = 0; i < 1000; i++)
{
    Buffer = GenerateRandomBuffer(327680, 128011992);                

    sw.Start();
    SubtractBackgroundFromBuffer(bgImg);
    sw.Stop();
}

Console.WriteLine("SubtractBackgroundFromBuffer(ms): " + sw.Elapsed.TotalMilliseconds.ToString("N2"));


public static ushort[] GenerateRandomBuffer(int size, int randomSeed)
{
    ushort[] buffer = new ushort[327680];
    Random random = new Random(randomSeed);

    for (int i = 0; i < size; i++)
    {
        buffer[i] = (ushort)random.Next(ushort.MinValue, ushort.MaxValue);
    }

    return buffer;
}
Run Code Online (Sandbox Code Playgroud)

方法

public static void SubtractBackgroundFromBuffer(ushort[] backgroundBuffer)
{
    int bufferLength = Buffer.Length;

    for (int index = 0; index < bufferLength; index++)
    {
        int difference = Buffer[index] - backgroundBuffer[index];

        if (difference >= 0)
            Buffer[index] = (ushort)difference;
        else
            Buffer[index] = 0;
    }
}

public static void SubtractBackgroundFromBufferWithCalcOpt(ushort[] backgroundBuffer)
{
    int bufferLength = Buffer.Length;

    for (int index = 0; index < bufferLength; index++)
    {
        if (Buffer[index] < backgroundBuffer[index])
        {
            Buffer[index] = 0;
        }
        else
        {
            Buffer[index] -= backgroundBuffer[index];
        }
    }
}

public static void SubtractBackgroundFromBufferParallelFor(ushort[] backgroundBuffer)
{
    Parallel.For(0, Buffer.Length, (i) =>
    {
        int difference = Buffer[i] - backgroundBuffer[i];
        if (difference >= 0)
            Buffer[i] = (ushort)difference;
        else
            Buffer[i] = 0;
    });
}        

public static void SubtractBackgroundFromBufferBlockParallelFor(ushort[] backgroundBuffer)
{
    int blockSize = 4096;

    Parallel.For(0, (int)Math.Ceiling(Buffer.Length / (double)blockSize), (j) =>
    {
        for (int i = j * blockSize; i < (j + 1) * blockSize; i++)
        {
            int difference = Buffer[i] - backgroundBuffer[i];

            Buffer[i] = (ushort)Math.Max(difference, 0);                    
        }
    });
}

public static void SubtractBackgroundFromBufferPartitionedParallelForEach(ushort[] backgroundBuffer)
{
    Parallel.ForEach(Partitioner.Create(0, Buffer.Length), range =>
        {
            for (int i = range.Item1; i < range.Item2; ++i)
            {
                if (Buffer[i] < backgroundBuffer[i])
                {
                    Buffer[i] = 0;
                }
                else
                {
                    Buffer[i] -= backgroundBuffer[i];
                }
            }
        });
}

public static void SubtractBackgroundFromBufferPartitionedParallelForEachHack(ushort[] backgroundBuffer)
{
    Parallel.ForEach(Partitioner.Create(0, Buffer.Length), range =>
    {
        for (int i = range.Item1; i < range.Item2; ++i)
        {
            unsafe
            {
                var nonNegative = Buffer[i] > backgroundBuffer[i];
                Buffer[i] = (ushort)((Buffer[i] - backgroundBuffer[i]) *
                    *((int*)(&nonNegative)));
            }
        }
    });
}
Run Code Online (Sandbox Code Playgroud)


Erg*_*wun 5

这是个有趣的问题。

只有在测试结果不会为负(如 TTat 和Maximum Cookie所建议)之后才执行减法的影响可以忽略不计,因为 JIT 编译器已经可以执行此优化。

并行化任务(如Selman22所建议的)是一个好主意,但是当循环在这种情况下如此快时,开销最终会超过收益,因此Selman22 的实现实际上在我的测试中运行得更慢。我怀疑nick_w 的基准测试是在附加调试器情况下生成的,隐藏了这个事实。

以更大的块并行化任务(如nick_w所建议的)处理开销问题,实际上可以产生更快的性能,但您不必自己计算块 - 您可以使用它Partitioner来为您执行此操作:

public static void SubtractBackgroundFromBufferPartitionedParallelForEach(
    ushort[] backgroundBuffer)
{
    Parallel.ForEach(Partitioner.Create(0, Buffer.Length), range =>
        {
            for (int i = range.Item1; i < range.Item2; ++i)
            {
                if (Buffer[i] < backgroundBuffer[i])
                {
                    Buffer[i] = 0;
                }
                else
                {
                    Buffer[i] -= backgroundBuffer[i];
                }
            }
        });
}
Run Code Online (Sandbox Code Playgroud)

在我的测试中,上述方法始终优于nick_w 的手动分块。

可是等等!还有更多的东西。

减慢代码速度的真正罪魁祸首不是赋值或算术。这是if声明。它如何影响性能将在很大程度上受到您正在处理的数据性质的影响。

nick_w 的基准测试为两个缓冲区生成相同量级的随机数据。但是,我怀疑您实际上很有可能在后台缓冲区中拥有较低的平均震级数据。由于分支预测,这个细节可能很重要(如这个经典的 SO 答案中所述)。

当后台缓冲区中的值通常小于缓冲区中的值时,JIT 编译器会注意到这一点,并相应地针对该分支进行优化。当每个缓冲区中的数据来自相同的随机群体时,无法以if超过 50% 的准确度猜测语句的结果。nick_w正是在后一种情况下进行基准测试,在这种情况下,我们可以通过使用不安全代码将 bool 转换为整数并完全避免分支来进一步优化您的方法。(请注意,以下代码依赖于 bool 如何在内存中表示的实现细节,虽然它适用于 .NET 4.5 中的场景,但它不一定是一个好主意,此处显示是为了说明目的。)

public static void SubtractBackgroundFromBufferPartitionedParallelForEachHack(
    ushort[] backgroundBuffer)
{
    Parallel.ForEach(Partitioner.Create(0, Buffer.Length), range =>
        {
            for (int i = range.Item1; i < range.Item2; ++i)
            {
                unsafe
                {
                    var nonNegative = Buffer[i] > backgroundBuffer[i];
                    Buffer[i] = (ushort)((Buffer[i] - backgroundBuffer[i]) *
                        *((int*)(&nonNegative)));
                }
            }
        });
}
Run Code Online (Sandbox Code Playgroud)

如果您真的希望节省更多时间,那么您可以通过将语言切换到 C++/CLI 以更安全的方式遵循此方法,因为这将使您可以在算术表达式中使用布尔值,而无需求助于不安全的代码:

UInt16 MyCppLib::Maths::SafeSubtraction(UInt16 minuend, UInt16 subtrahend)
{
    return (UInt16)((minuend - subtrahend) * (minuend > subtrahend));
}
Run Code Online (Sandbox Code Playgroud)

您可以使用公开上述静态方法的 C++/CLI 创建一个纯托管 DLL,然后在您的 C# 代码中使用它:

public static void SubtractBackgroundFromBufferPartitionedParallelForEachCpp(
    ushort[] backgroundBuffer)
{
    Parallel.ForEach(Partitioner.Create(0, Buffer.Length), range =>
    {
        for (int i = range.Item1; i < range.Item2; ++i)
        {
            Buffer[i] = 
                MyCppLib.Maths.SafeSubtraction(Buffer[i], backgroundBuffer[i]);
        }
    });
}
Run Code Online (Sandbox Code Playgroud)

这优于上面的 hacky unsafe C# 代码。事实上,它是如此之快,以至于您可以使用 C++/CLI 编写整个方法而忘记并行化,并且它仍然会胜过其他技术。

使用nick_w 的测试工具,上述方法的性能将优于此处发布的任何其他建议。以下是我得到的结果(1-4 是他尝试过的案例,5-7 是此答案中概述的案例):

1. SubtractBackgroundFromBuffer(ms):                               2,021.37
2. SubtractBackgroundFromBufferWithCalcOpt(ms):                    2,125.80
3. SubtractBackgroundFromBufferParallelFor(ms):                    3,431.58
4. SubtractBackgroundFromBufferBlockParallelFor(ms):               1,401.36
5. SubtractBackgroundFromBufferPartitionedParallelForEach(ms):     1,197.76
6. SubtractBackgroundFromBufferPartitionedParallelForEachHack(ms):   742.72
7. SubtractBackgroundFromBufferPartitionedParallelForEachCpp(ms):    499.27
Run Code Online (Sandbox Code Playgroud)

但是,在我希望您实际拥有的场景中,背景值通常较小,成功的分支预测会全面改善结果,而避免if语句的“hack”实际上更慢:

以下是我使用nick_w 的测试工具将后台缓冲区中的值限制为范围0-6500大约为缓冲区的10%)时得到的结果:

1. SubtractBackgroundFromBuffer(ms):                                 773.50
2. SubtractBackgroundFromBufferWithCalcOpt(ms):                      915.91
3. SubtractBackgroundFromBufferParallelFor(ms):                    2,458.36
4. SubtractBackgroundFromBufferBlockParallelFor(ms):                 663.76
5. SubtractBackgroundFromBufferPartitionedParallelForEach(ms):       658.05
6. SubtractBackgroundFromBufferPartitionedParallelForEachHack(ms):   762.11
7. SubtractBackgroundFromBufferPartitionedParallelForEachCpp(ms):    494.12
Run Code Online (Sandbox Code Playgroud)

您可以看到结果 1-5 都得到了显着改善,因为它们现在受益于更好的分支预测。结果 6 和 7 没有太大变化,因为它们避免了分支。

数据的这种变化完全改变了事情。在这种情况下,即使是最快的全 C# 解决方案现在也只比原始代码快 15%。

底线:请务必使用代表性数据测试您选择的任何方法,否则您的结果将毫无意义。