Branchless K-means (or other optimizations)

Dra*_*rgy 59 c++ optimization performance

Note: I'd appreciate more of a guide to how to approach and come up with these kinds of solutions rather than the solution itself.

I have a very performance-critical function in my system showing up as a number one profiling hotspot in specific contexts. It's in the middle of a k-means iteration (already multi-threaded using a parallel for processing sub-ranges of points in each worker thread).

ClusterPoint& pt = points[j];
pt.min_index = -1;
pt.min_dist = numeric_limits<float>::max();
for (int i=0; i < num_centroids; ++i)
{
    const ClusterCentroid& cent = centroids[i];
    const float dist = ...;
    if (dist < pt.min_dist) // <-- #1 hotspot
    {
        pt.min_dist = dist;
        pt.min_index = i;
    }
}
Run Code Online (Sandbox Code Playgroud)

处理这部分代码所需的时间节省很多,所以我经常一直在摆弄它.例如,将质心环放在外面可能是值得的,并且对于给定的质心,并行地迭代这些点.这里的聚类点数量超过数百万,而质心的数量跨越数千.该算法适用于少数迭代(通常低于10).它不寻求完美的收敛/稳定性,只是一些"合理的"近似.

Any ideas are appreciated, but what I'm really eager to discover is if this code can be made branchless as it would allow for a SIMD version. I haven't really developed the kind of mental ability to easily grasp how to come up with branchless solutions: my brain fails there much like it did when I was first exposed to recursion in the early days, so a guide on how to write branchless code and how to develop the appropriate mindset for it would also be helpful.

In short, I'm looking for any guides and hints and suggestions (not necessarily solutions) on how to micro-optimize this code. It most likely has room for algorithmic improvements, but my blindspot has always been in micro-optimization solutions (and I'm curious to learn how to apply them more effectively without going overboard with it). It's already tightly multithreaded with chunky parallel for logic, so I'm pretty much pushed into the micro-optimization corner as one of the quicker things to try without a smarter algorithm outright. We're completely free to change the memory layout.

In Response to Algorithmic Suggestions

About looking at this all wrong in seeking to micro-optimize an O(knm) algorithm which could clearly be improved at the algorithmic level, I wholeheartedly agree. This pushes this specific question into a somewhat academic and impractical realm. However, if I could be allowed an anecdote, I come from an original background of high-level programming -- big emphasis on broad, large-scale viewpoint, safety, and very little on the low-level implementation details. I've recently switched projects to a very different kind of modern-flavored one and I'm learning all kinds of new tricks from my peers of cache efficiency, GPGPU, branchless techniques, SIMD, special-purpose mem allocators that actually outperform malloc (but for specific scenarios), etc.

It's where I'm trying to catch up with the latest performance trends, and surprisingly I've found that those old data structures I often favored during the 90s which were often linked/tree-type structures are actually being vastly outperformed by much more naive, brutish, micro-optimized, parallelized code applying tuned instructions over contiguous memory blocks. It's somewhat disappointing at the same time since I feel like we're fitting the algorithms more to the machine now and narrowing the possibilities this way (especially with GPGPU).

The funniest thing is that I find this type of micro-optimized, fast array-processing code much easier to maintain than the sophisticated algorithms and data structures I was using before. For a start, they're easier to generalize. Furthermore, my peers can often take a customer complaint about a specific slowdown in an area, just slap a parallel for and possibly some SIMD and call it done with a decent speed up. Algorithmic improvements can often offer substantially more, but the speed and non-intrusiveness at which these micro-optimizations can be applied has me wanting to learn more in that area, as reading papers on better algorithms can take some time (as well as require more extensive changes). So I've been jumping on that micro-optimization bandwagon a bit more lately, and perhaps a little too much in this specific case, but my curiosity is more about expanding my range of possible solutions for any scenario.

Disassembly

Note: I am really, really bad at assembly so I have often tuned things more in a trial and error kind of way, coming up with somewhat educated guesses about why a hotspot shown in vtune might be the bottleneck and then trying things out to see if the times improve, assuming the guesses have some hint of truth if the times do improve, or completely missed the mark if they don't.

000007FEEE3FB8A1  jl          thread_partition+70h (7FEEE3FB780h) 
    {
        ClusterPoint& pt = points[j];
        pt.min_index = -1;
        pt.min_dist = numeric_limits<float>::max();
        for (int i = 0; i < num_centroids; ++i)
000007FEEE3FB8A7  cmp         ecx,r10d 
000007FEEE3FB8AA  jge         thread_partition+1F4h (7FEEE3FB904h) 
000007FEEE3FB8AC  lea         rax,[rbx+rbx*2] 
000007FEEE3FB8B0  add         rax,rax 
000007FEEE3FB8B3  lea         r8,[rbp+rax*8+8] 
        {
            const ClusterCentroid& cent = centroids[i];
            const float x = pt.pos[0] - cent.pos[0];
            const float y = pt.pos[1] - cent.pos[1];
000007FEEE3FB8B8  movss       xmm0,dword ptr [rdx] 
            const float z = pt.pos[2] - cent.pos[2];
000007FEEE3FB8BC  movss       xmm2,dword ptr [rdx+4] 
000007FEEE3FB8C1  movss       xmm1,dword ptr [rdx-4] 
000007FEEE3FB8C6  subss       xmm2,dword ptr [r8] 
000007FEEE3FB8CB  subss       xmm0,dword ptr [r8-4] 
000007FEEE3FB8D1  subss       xmm1,dword ptr [r8-8] 
            const float dist = x*x + y*y + z*z;
000007FEEE3FB8D7  mulss       xmm2,xmm2 
000007FEEE3FB8DB  mulss       xmm0,xmm0 
000007FEEE3FB8DF  mulss       xmm1,xmm1 
000007FEEE3FB8E3  addss       xmm2,xmm0 
000007FEEE3FB8E7  addss       xmm2,xmm1 

            if (dist < pt.min_dist)
// VTUNE HOTSPOT
000007FEEE3FB8EB  comiss      xmm2,dword ptr [rdx-8] 
000007FEEE3FB8EF  jae         thread_partition+1E9h (7FEEE3FB8F9h) 
            {
                pt.min_dist = dist;
000007FEEE3FB8F1  movss       dword ptr [rdx-8],xmm2 
                pt.min_index = i;
000007FEEE3FB8F6  mov         dword ptr [rdx-10h],ecx 
000007FEEE3FB8F9  inc         ecx  
000007FEEE3FB8FB  add         r8,30h 
000007FEEE3FB8FF  cmp         ecx,r10d 
000007FEEE3FB902  jl          thread_partition+1A8h (7FEEE3FB8B8h) 
    for (int j = *irange.first; j < *irange.last; ++j)
000007FEEE3FB904  inc         edi  
000007FEEE3FB906  add         rdx,20h 
000007FEEE3FB90A  cmp         edi,dword ptr [rsi+4] 
000007FEEE3FB90D  jl          thread_partition+31h (7FEEE3FB741h) 
000007FEEE3FB913  mov         rbx,qword ptr [irange] 
            }
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

We're forced into targeting SSE 2 -- a bit behind on our times, but the user base actually tripped up once when we assumed that even SSE 4 was okay as a min requirement (the user had some prototype Intel machine).

Update with Standalone Test: ~5.6 secs

I'm very appreciative of all the help being offered! Because the codebase is quite extensive and the conditions for triggering that code are complex (system events triggered across multiple threads), it's a bit unwieldy to make experimental changes and profile them each time. So I've set up a superficial test on the side as a standalone application that others can also run and try out so that I can experiment with all these graciously offered solutions.

#define _SECURE_SCL 0
#include <iostream>
#include <fstream>
#include <vector>
#include <limits>
#include <ctime>
#if defined(_MSC_VER)
    #define ALIGN16 __declspec(align(16))
#else
    #include <malloc.h>
    #define ALIGN16 __attribute__((aligned(16)))
#endif

using namespace std;

// Aligned memory allocation (for SIMD).
static void* malloc16(size_t amount)
{
    #ifdef _MSC_VER
        return _aligned_malloc(amount, 16);
    #else
        void* mem = 0;
        posix_memalign(&mem, 16, amount);
        return mem;
    #endif
}
template <class T>
static T* malloc16_t(size_t num_elements)
{
    return static_cast<T*>(malloc16(num_elements * sizeof(T)));
}

// Aligned free.
static void free16(void* mem)
{
    #ifdef _MSC_VER
        return _aligned_free(mem);
    #else
        free(mem);
    #endif
}

// Test parameters.
enum {num_centroids = 512};
enum {num_points = num_centroids * 2000};
enum {num_iterations = 5};
static const float range = 10.0f;

class Points
{
public:
    Points(): data(malloc16_t<Point>(num_points))
    {
        for (int p=0; p < num_points; ++p)
        {
            const float xyz[3] =
            {
                range * static_cast<float>(rand()) / RAND_MAX,
                range * static_cast<float>(rand()) / RAND_MAX,
                range * static_cast<float>(rand()) / RAND_MAX
            };
            init(p, xyz);
        }
    }
    ~Points()
    {
        free16(data);
    }
    void init(int n, const float* xyz)
    {
        data[n].centroid = -1;
        data[n].xyz[0] = xyz[0];
        data[n].xyz[1] = xyz[1];
        data[n].xyz[2] = xyz[2];
    }
    void associate(int n, int new_centroid)
    {
        data[n].centroid = new_centroid;
    }
    int centroid(int n) const
    {
        return data[n].centroid;
    }
    float* operator[](int n)
    {
        return data[n].xyz;
    }

private:
    Points(const Points&);
    Points& operator=(const Points&);
    struct Point
    {
        int centroid;
        float xyz[3];
    };
    Point* data;
};

class Centroids
{
public:
    Centroids(Points& points): data(malloc16_t<Centroid>(num_centroids))
    {
        // Naive initial selection algorithm, but outside the 
        // current area of interest.
        for (int c=0; c < num_centroids; ++c)
            init(c, points[c]);
    }
    ~Centroids()
    {
        free16(data);
    }
    void init(int n, const float* xyz)
    {
        data[n].count = 0;
        data[n].xyz[0] = xyz[0];
        data[n].xyz[1] = xyz[1];
        data[n].xyz[2] = xyz[2];
    }
    void reset(int n)
    {
        data[n].count = 0;
        data[n].xyz[0] = 0.0f;
        data[n].xyz[1] = 0.0f;
        data[n].xyz[2] = 0.0f;
    }
    void sum(int n, const float* pt_xyz)
    {
        data[n].xyz[0] += pt_xyz[0];
        data[n].xyz[1] += pt_xyz[1];
        data[n].xyz[2] += pt_xyz[2];
        ++data[n].count;
    }
    void average(int n)
    {
        if (data[n].count > 0)
        {
            const float inv_count = 1.0f / data[n].count;
            data[n].xyz[0] *= inv_count;
            data[n].xyz[1] *= inv_count;
            data[n].xyz[2] *= inv_count;
        }
    }
    float* operator[](int n)
    {
        return data[n].xyz;
    }
    int find_nearest(const float* pt_xyz) const
    {
        float min_dist_squared = numeric_limits<float>::max();
        int min_centroid = -1;
        for (int c=0; c < num_centroids; ++c)
        {
            const float* cen_xyz = data[c].xyz;
            const float x = pt_xyz[0] - cen_xyz[0];
            const float y = pt_xyz[1] - cen_xyz[1];
            const float z = pt_xyz[2] - cen_xyz[2];
            const float dist_squared = x*x + y*y * z*z;

            if (min_dist_squared > dist_squared)
            {
                min_dist_squared = dist_squared;
                min_centroid = c;
            }
        }
        return min_centroid;
    }

private:
    Centroids(const Centroids&);
    Centroids& operator=(const Centroids&);
    struct Centroid
    {
        int count;
        float xyz[3];
    };
    Centroid* data;
};

// A high-precision real timer would be nice, but we lack C++11 and
// the coarseness of the testing here should allow this to suffice.
static double sys_time()
{
    return static_cast<double>(clock()) / CLOCKS_PER_SEC;
}

static void k_means(Points& points, Centroids& centroids)
{
    // Find the closest centroid for each point.
    for (int p=0; p < num_points; ++p)
    {
        const float* pt_xyz = points[p];
        points.associate(p, centroids.find_nearest(pt_xyz));
    }

    // Reset the data of each centroid.
    for (int c=0; c < num_centroids; ++c)
        centroids.reset(c);

    // Compute new position sum of each centroid.
    for (int p=0; p < num_points; ++p)
        centroids.sum(points.centroid(p), points[p]);

    // Compute average position of each centroid.
    for (int c=0; c < num_centroids; ++c)
        centroids.average(c);
}

int main()
{
    Points points;
    Centroids centroids(points);

    cout << "Starting simulation..." << endl;
    double start_time = sys_time();
    for (int i=0; i < num_iterations; ++i)
        k_means(points, centroids);
    cout << "Time passed: " << (sys_time() - start_time) << " secs" << endl;
    cout << "# Points: " << num_points << endl;
    cout << "# Centroids: " << num_centroids << endl;

    // Write the centroids to a file to give us some crude verification
    // of consistency as we make changes.
    ofstream out("centroids.txt");
    for (int c=0; c < num_centroids; ++c)
        out << "Centroid " << c << ": " << centroids[c][0] << "," << centroids[c][1] << "," << centroids[c][2] << endl;
}
Run Code Online (Sandbox Code Playgroud)

I'm aware of the dangers of superficial testing, but since it's already deemed to be a hotspot from previous real-world sessions, I hope it's excusable. I'm also just interested in the general techniques associated with micro-optimizing such code.

I did get slightly different results in profiling this one. The times are a bit more evenly dispersed within the loop here, and I'm not sure why. Perhaps it's because the data is smaller (I omitted members and hoisted out the min_dist member and made it a local variable). The exact ratio between centroids to points is also a bit different, but hopefully close enough to translate improvements here to the original code. It's also single-threaded in this superficial test, and the disassembly looks quite different so I may be risking optimizing this superficial test without the original (a risk I'm willing to take for now, as I'm more interested in expanding my knowledge of techniques that could optimize these cases rather than a solution for this exact case).

VTune

Update with Yochai Timmer's Suggestion -- ~12.5 secs

Oh, I face the woes of micro-optimization without understanding assembly very well. I replaced this:

        -if (min_dist_squared > dist_squared)
        -{
        -    min_dist_squared = dist_squared;
        -    pt.centroid = c;
        -}
Run Code Online (Sandbox Code Playgroud)

With this:

        +const bool found_closer = min_dist_squared > dist_squared;
        +pt.centroid = bitselect(found_closer, c, pt.centroid);
        +min_dist_squared = bitselect(found_closer, dist_squared, min_dist_squared);
Run Code Online (Sandbox Code Playgroud)

.. only to find the times escalated from ~5.6 secs to ~12.5 secs. Nevertheless, that is not his fault nor does it take away from the value of his solution -- that's mine for failing to understand what's really going on at the machine level and taking stabs in the dark. That one apparently missed, and apparently I was not the victim of branch misprediction as I initially thought. Nevertheless, his proposed solution is a wonderful and generalized function to try in such cases, and I'm grateful to add it to my toolbox of tips and tricks. Now for round 2.

Harold's SIMD Solution - 2.496 secs (see caveat)

This solution might be amazing. After converting the cluster rep to SoA, I'm getting times of ~2.5 seconds with this one! Unfortunately, there appears to be a glitch of some sort. I'm getting very different results for the final output that suggests more than slight precision differences, including some centroids towards the end with values of 0 (implying that they were not found in the search). I've been trying to go through the SIMD logic with the debugger to see what might be up -- it could merely be a transcription error on my part, but here's the code in case someone could spot the error.

If the error could be corrected without slowing down the results, this speed improvement is more than I ever imagined from a pure micro-optimization!

    // New version of Centroids::find_nearest (from harold's solution):
    int find_nearest(const float* pt_xyz) const
    {
        __m128i min_index = _mm_set_epi32(3, 2, 1, 0);
        __m128 xdif = _mm_sub_ps(_mm_set1_ps(pt_xyz[0]), _mm_load_ps(cen_x));
        __m128 ydif = _mm_sub_ps(_mm_set1_ps(pt_xyz[1]), _mm_load_ps(cen_y));
        __m128 zdif = _mm_sub_ps(_mm_set1_ps(pt_xyz[2]), _mm_load_ps(cen_z));
        __m128 min_dist = _mm_add_ps(_mm_add_ps(_mm_mul_ps(xdif, xdif), 
                                                _mm_mul_ps(ydif, ydif)), 
                                                _mm_mul_ps(zdif, zdif));
        __m128i index = min_index;
        for (int i=4; i < num_centroids; i += 4) 
        {
            xdif = _mm_sub_ps(_mm_set1_ps(pt_xyz[0]), _mm_load_ps(cen_x + i));
            ydif = _mm_sub_ps(_mm_set1_ps(pt_xyz[1]), _mm_load_ps(cen_y + i));
            zdif = _mm_sub_ps(_mm_set1_ps(pt_xyz[2]), _mm_load_ps(cen_z + i));
            __m128 dist = _mm_add_ps(_mm_add_ps(_mm_mul_ps(xdif, xdif), 
                                                _mm_mul_ps(ydif, ydif)), 
                                                _mm_mul_ps(zdif, zdif));
            __m128i mask = _mm_castps_si128(_mm_cmplt_ps(dist, min_dist));
            min_dist = _mm_min_ps(min_dist, dist);
            min_index = _mm_or_si128(_mm_and_si128(index, mask), 
                                     _mm_andnot_si128(mask, min_index));
            index = _mm_add_epi32(index, _mm_set1_epi32(4));
        }

        ALIGN16 float mdist[4];
        ALIGN16 uint32_t mindex[4];
        _mm_store_ps(mdist, min_dist);
        _mm_store_si128((__m128i*)mindex, min_index);

        float closest = mdist[0];
        int closest_i = mindex[0];
        for (int i=1; i < 4; i++)
        {
            if (mdist[i] < closest) 
            {
                closest = mdist[i];
                closest_i = mindex[i];
            }
        }
        return closest_i;
    }
Run Code Online (Sandbox Code Playgroud)

Harold's SIMD Solution (Corrected) - ~2.5 secs

After applying the corrections and testing them out, the results are intact and function correctly with similar improvements to the original codebase!

Since this hits the holy grail of knowledge I was seeking to understand better (branchless SIMD), I'm going to award the solution with some extra props for more than doubling the speed of the operation. I have my homework cut out in trying to understand it, since my goal was not merely to mitigate this hotspot, but to expand on my personal understanding of possible solutions to deal with them.

Nevertheless, I'm grateful for all the contributions here from the algorithmic suggestions to the really cool bitselect trick! I wish I could accept all the answers. I may end up trying all of them at some point, but for now I have my homework cut out in understanding some of these non-arithmetical SIMD ops.

int find_nearest_simd(const float* pt_xyz) const
{
    __m128i min_index = _mm_set_epi32(3, 2, 1, 0);
    __m128 pt_xxxx = _mm_set1_ps(pt_xyz[0]);
    __m128 pt_yyyy = _mm_set1_ps(pt_xyz[1]);
    __m128 pt_zzzz = _mm_set1_ps(pt_xyz[2]);

    __m128 xdif = _mm_sub_ps(pt_xxxx, _mm_load_ps(cen_x));
    __m128 ydif = _mm_sub_ps(pt_yyyy, _mm_load_ps(cen_y));
    __m128 zdif = _mm_sub_ps(pt_zzzz, _mm_load_ps(cen_z));
    __m128 min_dist = _mm_add_ps(_mm_add_ps(_mm_mul_ps(xdif, xdif), 
                                            _mm_mul_ps(ydif, ydif)), 
                                            _mm_mul_ps(zdif, zdif));
    __m128i index = min_index;
    for (int i=4; i < num_centroids; i += 4) 
    {
        xdif = _mm_sub_ps(pt_xxxx, _mm_load_ps(cen_x + i));
        ydif = _mm_sub_ps(pt_yyyy, _mm_load_ps(cen_y + i));
        zdif = _mm_sub_ps(pt_zzzz, _mm_load_ps(cen_z + i));
        __m128 dist = _mm_add_ps(_mm_add_ps(_mm_mul_ps(xdif, xdif), 
                                            _mm_mul_ps(ydif, ydif)), 
                                            _mm_mul_ps(zdif, zdif));
        index = _mm_add_epi32(index, _mm_set1_epi32(4));
        __m128i mask = _mm_castps_si128(_mm_cmplt_ps(dist, min_dist));
        min_dist = _mm_min_ps(min_dist, dist);
        min_index = _mm_or_si128(_mm_and_si128(index, mask), 
                                 _mm_andnot_si128(mask, min_index));
    }

    ALIGN16 float mdist[4];
    ALIGN16 uint32_t mindex[4];
    _mm_store_ps(mdist, min_dist);
    _mm_store_si128((__m128i*)mindex, min_index);

    float closest = mdist[0];
    int closest_i = mindex[0];
    for (int i=1; i < 4; i++)
    {
        if (mdist[i] < closest) 
        {
            closest = mdist[i];
            closest_i = mindex[i];
        }
    }
    return closest_i;
}
Run Code Online (Sandbox Code Playgroud)

har*_*old 22

太糟糕了,我们不能使用SSE4.1,但很好,SSE2就是这样.我没有对此进行测试,只是编译它以查看是否存在语法错误并查看程序集是否有意义(它大部分都没问题,尽管GCC溢出min_index甚至一些xmm未使用的寄存器,但不确定为什么会发生这种情况)

int find_closest(float *x, float *y, float *z,
                 float pt_x, float pt_y, float pt_z, int n) {
    __m128i min_index = _mm_set_epi32(3, 2, 1, 0);
    __m128 xdif = _mm_sub_ps(_mm_set1_ps(pt_x), _mm_load_ps(x));
    __m128 ydif = _mm_sub_ps(_mm_set1_ps(pt_y), _mm_load_ps(y));
    __m128 zdif = _mm_sub_ps(_mm_set1_ps(pt_z), _mm_load_ps(z));
    __m128 min_dist = _mm_add_ps(_mm_add_ps(_mm_mul_ps(xdif, xdif), 
                                            _mm_mul_ps(ydif, ydif)), 
                                            _mm_mul_ps(zdif, zdif));
    __m128i index = min_index;
    for (int i = 4; i < n; i += 4) {
        xdif = _mm_sub_ps(_mm_set1_ps(pt_x), _mm_load_ps(x + i));
        ydif = _mm_sub_ps(_mm_set1_ps(pt_y), _mm_load_ps(y + i));
        zdif = _mm_sub_ps(_mm_set1_ps(pt_z), _mm_load_ps(z + i));
        __m128 dist = _mm_add_ps(_mm_add_ps(_mm_mul_ps(xdif, xdif), 
                                            _mm_mul_ps(ydif, ydif)), 
                                            _mm_mul_ps(zdif, zdif));
        index = _mm_add_epi32(index, _mm_set1_epi32(4));
        __m128i mask = _mm_castps_si128(_mm_cmplt_ps(dist, min_dist));
        min_dist = _mm_min_ps(min_dist, dist);
        min_index = _mm_or_si128(_mm_and_si128(index, mask), 
                                 _mm_andnot_si128(mask, min_index));
    }
    float mdist[4];
    _mm_store_ps(mdist, min_dist);
    uint32_t mindex[4];
    _mm_store_si128((__m128i*)mindex, min_index);
    float closest = mdist[0];
    int closest_i = mindex[0];
    for (int i = 1; i < 4; i++) {
        if (mdist[i] < closest) {
            closest = mdist[i];
            closest_i = mindex[i];
        }
    }
    return closest_i;
}
Run Code Online (Sandbox Code Playgroud)

像往常一样,它希望指针是16对齐的.此外,填充应该是无限远的点(因此它们永远不会最接近目标).

SSE 4.1会让你替换它

min_index = _mm_or_si128(_mm_and_si128(index, mask), 
                         _mm_andnot_si128(mask, min_index));
Run Code Online (Sandbox Code Playgroud)

这样

min_index = _mm_blendv_epi8(min_index, index, mask);
Run Code Online (Sandbox Code Playgroud)

这是一个asm版本,为vsyasm制作,测试了一下(似乎工作)

bits 64

section .data

align 16
centroid_four:
    dd 4, 4, 4, 4
centroid_index:
    dd 0, 1, 2, 3

section .text

global find_closest

proc_frame find_closest
    ;
    ;   arguments:
    ;       ecx: number of points (multiple of 4 and at least 4)
    ;       rdx -> array of 3 pointers to floats (x, y, z) (the points)
    ;       r8 -> array of 3 floats (the reference point)
    ;
    alloc_stack 0x58
    save_xmm128 xmm6, 0
    save_xmm128 xmm7, 16
    save_xmm128 xmm8, 32
    save_xmm128 xmm9, 48
[endprolog]
    movss xmm0, [r8]
    shufps xmm0, xmm0, 0
    movss xmm1, [r8 + 4]
    shufps xmm1, xmm1, 0
    movss xmm2, [r8 + 8]
    shufps xmm2, xmm2, 0
    ; pointers to x, y, z in r8, r9, r10
    mov r8, [rdx]
    mov r9, [rdx + 8]
    mov r10, [rdx + 16]
    ; reference point is in xmm0, xmm1, xmm2 (x, y, z)
    movdqa xmm3, [rel centroid_index]   ; min_index
    movdqa xmm4, xmm3                   ; current index
    movdqa xmm9, [rel centroid_four]     ; index increment
    paddd xmm4, xmm9
    ; calculate initial min_dist, xmm5
    movaps xmm5, [r8]
    subps xmm5, xmm0
    movaps xmm7, [r9]
    subps xmm7, xmm1
    movaps xmm8, [r10]
    subps xmm8, xmm2
    mulps xmm5, xmm5
    mulps xmm7, xmm7
    mulps xmm8, xmm8
    addps xmm5, xmm7
    addps xmm5, xmm8
    add r8, 16
    add r9, 16
    add r10, 16
    sub ecx, 4
    jna _tail
_loop:
    movaps xmm6, [r8]
    subps xmm6, xmm0
    movaps xmm7, [r9]
    subps xmm7, xmm1
    movaps xmm8, [r10]
    subps xmm8, xmm2
    mulps xmm6, xmm6
    mulps xmm7, xmm7
    mulps xmm8, xmm8
    addps xmm6, xmm7
    addps xmm6, xmm8
    add r8, 16
    add r9, 16
    add r10, 16
    movaps xmm7, xmm6
    cmpps xmm6, xmm5, 1
    minps xmm5, xmm7
    movdqa xmm7, xmm6
    pand xmm6, xmm4
    pandn xmm7, xmm3
    por xmm6, xmm7
    movdqa xmm3, xmm6
    paddd xmm4, xmm9
    sub ecx, 4
    ja _loop
_tail:
    ; calculate horizontal minumum
    pshufd xmm0, xmm5, 0xB1
    minps xmm0, xmm5
    pshufd xmm1, xmm0, 0x4E
    minps xmm0, xmm1
    ; find index of the minimum
    cmpps xmm0, xmm5, 0
    movmskps eax, xmm0
    bsf eax, eax
    ; index into xmm3, sort of
    movaps [rsp + 64], xmm3
    mov eax, [rsp + 64 + rax * 4]
    movaps xmm9, [rsp + 48]
    movaps xmm8, [rsp + 32]
    movaps xmm7, [rsp + 16]
    movaps xmm6, [rsp]
    add rsp, 0x58
    ret
endproc_frame
Run Code Online (Sandbox Code Playgroud)

在C++中:

extern "C" int find_closest(int n, float** points, float* reference_point);
Run Code Online (Sandbox Code Playgroud)


Yoc*_*mer 21

您可以使用无分支三元运算符,有时也称为bitselect(condition?true:false).
只需将它用于2名成员,默认为什么都不做.
不要担心额外的操作,它们与if语句分支相比没什么.

bitselect实现:

inline static int bitselect(int condition, int truereturnvalue, int falsereturnvalue)
{
    return (truereturnvalue & -condition) | (falsereturnvalue & ~(-condition)); //a when TRUE and b when FALSE
}

inline static float bitselect(int condition, float truereturnvalue, float falsereturnvalue)
{
    //Reinterpret floats. Would work because it's just a bit select, no matter the actual value
    int& at = reinterpret_cast<int&>(truereturnvalue);
    int& af = reinterpret_cast<int&>(falsereturnvalue);
    int res = (at & -condition) | (af & ~(-condition)); //a when TRUE and b when FALSE
    return  reinterpret_cast<float&>(res);
}
Run Code Online (Sandbox Code Playgroud)

你的循环应该是这样的:

for (int i=0; i < num_centroids; ++i)
{
  const ClusterCentroid& cent = centroids[i];
  const float dist = ...;
  bool isSmaeller = dist < pt.min_dist;

  //use same value if not smaller
  pt.min_index = bitselect(isSmaeller, i, pt.min_index);
  pt.min_dist = bitselect(isSmaeller, dist, pt.min_dist);
}
Run Code Online (Sandbox Code Playgroud)

  • 如果您想测量改进,请添加评论.我想我们都想知道它是怎么回事. (3认同)
  • 是否操作(重新解释铸造)浮点数定义了行为? (3认同)

Arn*_*gel 10

C++是一种高级语言.您认为C++源代码中的控制流转换为分支指令是有缺陷的.我没有从您的示例中定义某些类型,因此我创建了一个具有类似条件赋值的简单测试程序:

int g(int, int);

int f(const int *arr)
{
    int min = 10000, minIndex = -1;
    for ( int i = 0; i < 1000; ++i )
    {
        if ( arr[i] < min )
        {
            min = arr[i];
            minIndex = i;
        }
    }
    return g(min, minIndex);
}
Run Code Online (Sandbox Code Playgroud)

请注意,使用未定义的"g"仅仅是为了防止优化器删除所有内容.我将带有-O3和-S的G ++ 4.9.2翻译成x86_64程序集(甚至不需要更改-march的默认值)并且(不过于令人惊讶)结果是循环体不包含分支

movl    (%rdi,%rax,4), %ecx
movl    %edx, %r8d
cmpl    %edx, %ecx
cmovle  %ecx, %r8d
cmovl   %eax, %esi
addq    $1, %rax
Run Code Online (Sandbox Code Playgroud)

除此之外,无分支必然更快的假设也可能是有缺陷的,因为新距离"击败"旧的概率正在减少你所看到的元素越多.这不是硬币投掷.当编译器在生成"as-if"组件方面比现在更不具有攻击性时,发明了"bitselect"技巧.我宁愿建议看看的那种组装你的编译器真正产生之前,要么试图返工的代码,使编译器能够更好地优化它,或采取的结果为手工编写的汇编的基础.如果您想查看SIMD,我建议尝试使用减少数据依赖性的"最小化"方法(在我的示例中,对"min"的依赖可能是瓶颈).

  • 这是真的.但是,编译器并不总能做到正确.编译器只能处理一定程度的复杂性.如果值不是常量(就像你拥有它们那样),编译器就不那么明显了.如果性能分析会引发问题,那么可以选择bitselect或类似的技巧. (2认同)

Mik*_*eMB 6

这可能是双向的,但我会尝试以下结构:

std::vector<float> centDists(num_centroids); //<-- one for each thread. 
for (size_t p=0; p<num_points; ++p) {
    Point& pt = points[p];
    for (size_t c=0; c<num_centroids; ++c) {
        const float dist = ...;
        centDists[c]=dist;
    }
    pt.min_idx it= min_element(centDists.begin(),centDists.end())-centDists.begin();    
}
Run Code Online (Sandbox Code Playgroud)

显然,你现在必须在内存上迭代两次,这可能会损害缓存命中率(你也可以将它分成子范围)但另一方面,每个内部循环都应该易于矢量化和展开 - 所以你只需要衡量它是否值得.

即使您坚持使用您的版本,我也会尝试使用局部变量来跟踪最小索引和距离,并将结果应用到最后.
理性的是,每次读取或写入pt.min_dist都是通过指针有效完成的,这取决于编译器的优化,可能会也可能不会降低您的性能.

对矢量化很重要的另一件事是将Structs数组(在本例中为cententroids)转换为数组结构(例如,每个点的坐标都有一个数组),因为这样你不需要额外的收集指令为了使用SIMD指令加载数据.有关该主题的更多信息,请参阅Eric Brumer的演讲.

编辑:我的系统的一些数字(haswell,clang 3.5):
我用你的基准测试和我的系统做了一个简短的测试,上面的代码使算法减慢了大约10% - 基本上,没有任何东西可以被矢量化.

然而,当将AoS应用于质心的SoA变换时,距离计算被矢量化,与使用AoS到SoA转换的原始结构相比,导致整体运行时间减少约40%.


Ada*_*dam 6

首先,我建议在尝试任何代码更改之前,先查看优化构建中的反汇编.理想情况下,您希望在汇编级别查看分析器数据.这可以显示各种事物,例如:

  1. 编译器可能没有生成实际的分支指令.
  2. 具有瓶颈的代码行可能具有与您相关的更多指令 - 例如,dist计算.

除此之外还有标准技巧,当你谈论距离计算时,它们通常需要一个平方根.你应该在最小平方值的过程结束时做那个平方根.

SSE可以使用_mm_min_ps一次处理四个值,而不使用任何分支.如果你真的需要速度,那么你想要使用SSE(或AVX)内在函数.这是一个基本的例子:

  float MinimumDistance(const float *values, int count)
  {
    __m128 min = _mm_set_ps(FLT_MAX, FLT_MAX, FLT_MAX, FLT_MAX);
    int i=0;
    for (; i < count - 3; i+=4)
    {
        __m128 distances = _mm_loadu_ps(&values[i]);
        min = _mm_min_ps(min, distances);
    }
    // Combine the four separate minimums to a single value
    min = _mm_min_ps(min, _mm_shuffle_ps(min, min, _MM_SHUFFLE(2, 3, 0, 1)));
    min = _mm_min_ps(min, _mm_shuffle_ps(min, min, _MM_SHUFFLE(1, 0, 3, 2)));

    // Deal with the last 0-3 elements the slow way
    float result = FLT_MAX;
    if (count > 3) _mm_store_ss(&result, min);
    for (; i < count; i++)
    {
        result = min(values[i], result);
    }

    return result;
  }
Run Code Online (Sandbox Code Playgroud)

为获得最佳SSE性能,您应确保在对齐的地址处发生负载.如有必要,您可以像上面代码中的最后几个一样处理前几个未对齐的元素.

另外需要注意的是内存带宽.如果在该循环期间没有使用ClusterCentroid结构的几个成员,那么您将从内存中读取比实际需要更多的数据,因为在高速缓存行大小的块中读取内存,每个块为64字节.