Mat*_*ews 5 algorithm f# intersection simd avx
我正在尝试实现两个数据集的最快连接,并在索引匹配时取值的乘积。我有一个标量方法,但我相信使用 SIMD 我可以加速这个算法。我有两个Span<int>必须匹配才能获取产品的密钥:aKeys和bKeys. 如果 inaKeys中的值与 中的值相匹配,则应将 和 中bKeys的相应值相乘并存储。aValuesbValues
我期待使用SIMD指令能够从比较单一的钥匙aKeys在许多价值观bKeys。诀窍是,一旦 的值bKeys大于aKey我正在测试的值,我就需要移动到下一个aKey值。这是对这个问题的跟进。可以在此 repo 中找到带有基准代码的完整示例。
该算法IndexOf<T>在SpanHelpers是接近,但它仅用于查找单个值。我试图利用这样一个事实,即我有几个要查找的值,所有值都是唯一的并且按升序排列。
// NOTE: The length of aKeys and aValues is the same.
// The length of bKeys and bValues is the same.
// The values in `aKeys` and `bKeys` are unique and sorted but not every value
// in aKeys will be present in bKeys and vice versa.
let hadamardProduct (aKeys: Span<int>, aValues: Span<float>, bKeys: Span<int>, bValues: Span<float>) =
let maxN = Math.Min (aKeys.Length, bKeys.Length)
let outKeys = Array.zeroCreate maxN
let outValues = Array.zeroCreate maxN
let mutable aIdx = 0
let mutable bIdx = 0
let mutable outIdx = 0
while aIdx < aKeys.Length && bIdx < bKeys.Length do
if aKeys.[aIdx] = bKeys.[bIdx] then
outKeys.[outIdx] <- aKeys.[aIdx]
outValues.[outIdx] <- aValues.[aIdx] * bValues.[bIdx]
outIdx <- outIdx + 1
aIdx <- aIdx + 1
bIdx <- bIdx + 1
elif aKeys.[aIdx] < bKeys.[bIdx] then
aIdx <- aIdx + 1
else
bIdx <- bIdx + 1
let resultKeys = Memory (outKeys, 0, outIdx)
let resultValues = Memory (outValues, 0, outIdx)
resultKeys, resultValues
Run Code Online (Sandbox Code Playgroud)
更新 2021-08-23 09:26
在对此进行了更多研究之后,我现在提出了以下方法。我很好奇是否有什么办法可以使代码运行得更快?
#nowarn "9" "51" "20" // Don't want warnings about pointers
open System
open FSharp.NativeInterop
open System.Numerics
open System.Runtime.Intrinsics.X86
open System.Runtime.Intrinsics
let hadamardProduct (aKeys: Span<int>, aValues: Span<float>, bKeys: Span<int>, bValues: Span<float>) =
let maxN = Math.Min (aKeys.Length, bKeys.Length)
let outKeys = Array.zeroCreate maxN
let outValues = Array.zeroCreate maxN
let mutable aIdx = 0
let mutable bIdx = 0
let mutable outIdx = 0
if bKeys.Length > 4 then
let lastBlockIdx = bKeys.Length - (bKeys.Length % 4)
let bPointer = && (bKeys.GetPinnableReference ())
let mutable bVector = Sse2.LoadVector128 (NativePtr.add bPointer bIdx)
while aIdx < aKeys.Length && bIdx < lastBlockIdx do
let aVector = Vector128.Create aKeys.[aIdx]
let comparison = Sse2.CompareEqual (aVector, bVector)
let matches = Sse2.MoveMask (comparison.AsByte ())
if matches > 0 then
let bIdxOffset = (BitOperations.TrailingZeroCount matches) / 4 // Convert byte offset to index
outKeys.[outIdx] <- aKeys.[aIdx]
outValues.[outIdx] <- aValues.[aIdx] * bValues.[bIdx + bIdxOffset]
aIdx <- aIdx + 1
outIdx <- outIdx + 1
// REMEMBER, bIdx is testing 4 values at a time so we don't always want to jump
elif aKeys.[aIdx] > bKeys.[bIdx + 3] then
// REMEMBER!! bIdx needs to stride, not increment
bIdx <- bIdx + 4
// We only want to load new values when necessary
if bIdx < lastBlockIdx then
bVector <- Sse2.LoadVector128 (NativePtr.add bPointer bIdx)
else
aIdx <- aIdx + 1
// Final pass for data that didn't fit the Vector128 size
while aIdx < aKeys.Length && bIdx < bKeys.Length do
if aKeys.[aIdx] = bKeys.[bIdx] then
outKeys.[outIdx] <- aKeys.[aIdx]
outValues.[outIdx] <- aValues.[aIdx] * bValues.[bIdx]
outIdx <- outIdx + 1
aIdx <- aIdx + 1
bIdx <- bIdx + 1
elif aKeys.[aIdx] < bKeys.[bIdx] then
aIdx <- aIdx + 1
else
bIdx <- bIdx + 1
let resultKeys = Memory (outKeys, 0, outIdx)
let resultValues = Memory (outValues, 0, outIdx)
resultKeys, resultValues
Run Code Online (Sandbox Code Playgroud)