SIMD 指令,用于加速搜索两个已排序的 int 数组之间的匹配值,用于 Hadamard 乘积

Mat*_*ews 5 algorithm f# intersection simd avx

我正在尝试实现两个数据集的最快连接,并在索引匹配时取值的乘积。我有一个标量方法,但我相信使用 SIMD 我可以加速这个算法。我有两个Span<int>必须匹配才能获取产品的密钥:aKeysbKeys. 如果 inaKeys中的值与 中的值相匹配,则应将 和 中bKeys的相应值相乘并存储。aValuesbValues

我期待使用SIMD指令能够从比较单一的钥匙aKeys在许多价值观bKeys。诀窍是,一旦 的值bKeys大于aKey我正在测试的值,我就需要移动到下一个aKey值。这是对这个问题的跟进。可以在此 repo 中找到带有基准代码的完整示例。

该算法IndexOf<T>SpanHelpers是接近,但它仅用于查找单个值。我试图利用这样一个事实,即我有几个要查找的值,所有值都是唯一的并且按升序排列。

// NOTE: The length of aKeys and aValues is the same.
// The length of bKeys and bValues is the same.
// The values in `aKeys` and `bKeys` are unique and sorted but not every value
// in aKeys will be present in bKeys and vice versa.

let hadamardProduct (aKeys: Span<int>, aValues: Span<float>, bKeys: Span<int>, bValues: Span<float>) =
    let maxN = Math.Min (aKeys.Length, bKeys.Length)
    let outKeys = Array.zeroCreate maxN
    let outValues = Array.zeroCreate maxN

    let mutable aIdx = 0
    let mutable bIdx = 0
    let mutable outIdx = 0

    while aIdx < aKeys.Length && bIdx < bKeys.Length do
        
        if aKeys.[aIdx] = bKeys.[bIdx] then
            outKeys.[outIdx] <- aKeys.[aIdx]
            outValues.[outIdx] <- aValues.[aIdx] * bValues.[bIdx]
            outIdx <- outIdx + 1
            aIdx <- aIdx + 1
            bIdx <- bIdx + 1
        elif aKeys.[aIdx] < bKeys.[bIdx] then
            aIdx <- aIdx + 1
        else
            bIdx <- bIdx + 1

    let resultKeys = Memory (outKeys, 0, outIdx)
    let resultValues = Memory (outValues, 0, outIdx)

    resultKeys, resultValues
Run Code Online (Sandbox Code Playgroud)

更新 2021-08-23 09:26

在对此进行了更多研究之后,我现在提出了以下方法。我很好奇是否有什么办法可以使代码运行得更快?

#nowarn "9" "51" "20" // Don't want warnings about pointers

open System
open FSharp.NativeInterop
open System.Numerics
open System.Runtime.Intrinsics.X86
open System.Runtime.Intrinsics

let hadamardProduct (aKeys: Span<int>, aValues: Span<float>, bKeys: Span<int>, bValues: Span<float>) =
    let maxN = Math.Min (aKeys.Length, bKeys.Length)
    let outKeys = Array.zeroCreate maxN
    let outValues = Array.zeroCreate maxN

    let mutable aIdx = 0
    let mutable bIdx = 0
    let mutable outIdx = 0

    if bKeys.Length > 4 then

        let lastBlockIdx = bKeys.Length - (bKeys.Length % 4)
        let bPointer = && (bKeys.GetPinnableReference ())
        let mutable bVector = Sse2.LoadVector128 (NativePtr.add bPointer bIdx)

        while aIdx < aKeys.Length && bIdx < lastBlockIdx do
            let aVector = Vector128.Create aKeys.[aIdx]
            let comparison = Sse2.CompareEqual (aVector, bVector)
            let matches = Sse2.MoveMask (comparison.AsByte ())

            if matches > 0 then
                let bIdxOffset = (BitOperations.TrailingZeroCount matches) / 4 // Convert byte offset to index
                outKeys.[outIdx] <- aKeys.[aIdx]
                outValues.[outIdx] <- aValues.[aIdx] * bValues.[bIdx + bIdxOffset]
                aIdx <- aIdx + 1
                outIdx <- outIdx + 1
                // REMEMBER, bIdx is testing 4 values at a time so we don't always want to jump

            elif aKeys.[aIdx] > bKeys.[bIdx + 3] then
                // REMEMBER!! bIdx needs to stride, not increment
                bIdx <- bIdx + 4
                // We only want to load new values when necessary
                if bIdx < lastBlockIdx then
                    bVector <- Sse2.LoadVector128 (NativePtr.add bPointer bIdx)
            else
                aIdx <- aIdx + 1

    // Final pass for data that didn't fit the Vector128 size
    while aIdx < aKeys.Length && bIdx < bKeys.Length do
        
        if aKeys.[aIdx] = bKeys.[bIdx] then
            outKeys.[outIdx] <- aKeys.[aIdx]
            outValues.[outIdx] <- aValues.[aIdx] * bValues.[bIdx]
            outIdx <- outIdx + 1
            aIdx <- aIdx + 1
            bIdx <- bIdx + 1
        elif aKeys.[aIdx] < bKeys.[bIdx] then
            aIdx <- aIdx + 1
        else
            bIdx <- bIdx + 1

    let resultKeys = Memory (outKeys, 0, outIdx)
    let resultValues = Memory (outValues, 0, outIdx)

    resultKeys, resultValues
Run Code Online (Sandbox Code Playgroud)