numpy 怎么这么快?

jer*_*111 61 c c++ arrays performance numpy

numpy基于我与优化的 C/C++ 代码的令人震惊的比较,我试图了解怎么能这么快,这仍然远不能重现 numpy 的速度。

考虑以下示例:给定一个带有shape=(N, N)和的二维数组dtype=float32,它表示 N 维的 N 个向量的列表,我正在计算每对向量之间的成对差异。使用numpy广播,这简单地写为:

def pairwise_sub_numpy( X ):
    return X - X[:, None, :]
Run Code Online (Sandbox Code Playgroud)

使用timeit我可以测量性能N=512:在我的笔记本电脑上每次通话需要 88 毫秒。

现在,在 C/C++ 中,一个简单的实现写为:

#define X(i, j)     _X[(i)*N + (j)]
#define res(i, j, k)  _res[((i)*N + (j))*N + (k)]

float* pairwise_sub_naive( const float* _X, int N ) 
{
    float* _res = (float*) aligned_alloc( 32, N*N*N*sizeof(float));

    for (int i = 0; i < N; i++) {
        for (int j = 0; j < N; j++) {
            for (int k = 0; k < N; k++)
                res(i,j,k) = X(i,k) - X(j,k);
          }
    }
    return _res;
}
Run Code Online (Sandbox Code Playgroud)

使用gcc带有-O3标志的7.3.0 进行编译,每次调用 for 的时间为 195 毫秒pairwise_sub_naive(X),考虑到代码的简单性,这还算不错,但比numpy.

现在我开始认真起来并添加一些小的优化,通过直接索引行向量:

float* pairwise_sub_better( const float* _X, int N ) 
{
    float* _res = (float*) aligned_alloc( 32, N*N*N*sizeof(float));
    
    for (int i = 0; i < N; i++) {
        const float* xi = & X(i,0);

        for (int j = 0; j < N; j++) {
            const float* xj = & X(j,0);
            
            float* r = &res(i,j,0);
            for (int k = 0; k < N; k++)
                 r[k] = xi[k] - xj[k];
        }
    }
    return _res;
}
Run Code Online (Sandbox Code Playgroud)

速度保持不变,为 195 毫秒,这意味着编译器能够计算这么多。现在让我们使用 SIMD 向量指令:

float* pairwise_sub_simd( const float* _X, int N ) 
{
    float* _res = (float*) aligned_alloc( 32, N*N*N*sizeof(float));

    // create caches for row vectors which are memory-aligned
    float* xi = (float*)aligned_alloc(32, N * sizeof(float));
    float* xj = (float*)aligned_alloc(32, N * sizeof(float));
    
    for (int i = 0; i < N; i++) {
        memcpy(xi, & X(i,0), N*sizeof(float));
        
        for (int j = 0; j < N; j++) {
            memcpy(xj, & X(j,0), N*sizeof(float));
            
            float* r = &res(i,j,0);
            for (int k = 0; k < N; k += 256/sizeof(float)) {
                const __m256 A = _mm256_load_ps(xi+k);
                const __m256 B = _mm256_load_ps(xj+k);
                _mm256_store_ps(r+k, _mm256_sub_ps( A, B ));
            }
        }
    }
    free(xi); 
    free(xj);
    return _res;
}
Run Code Online (Sandbox Code Playgroud)

这只会产生很小的提升(每个函数调用 178 毫秒而不是 194 毫秒)。

然后我想知道“块式”方法(例如用于优化点积的方法)是否有益:

float* pairwise_sub_blocks( const float* _X, int N ) 
{
    float* _res = (float*) aligned_alloc( 32, N*N*N*sizeof(float));

    #define B 8
    float cache1[B*B], cache2[B*B];

    for (int bi = 0; bi < N; bi+=B)
      for (int bj = 0; bj < N; bj+=B)
        for (int bk = 0; bk < N; bk+=B) {
        
            // load first 8x8 block in the cache
            for (int i = 0; i < B; i++)
              for (int k = 0; k < B; k++)
                cache1[B*i + k] = X(bi+i, bk+k);

            // load second 8x8 block in the cache
            for (int j = 0; j < B; j++)
              for (int k = 0; k < B; k++)
                cache2[B*j + k] = X(bj+j, bk+k);

            // compute local operations on the caches
            for (int i = 0; i < B; i++)
             for (int j = 0; j < B; j++)
              for (int k = 0; k < B; k++)
                 res(bi+i,bj+j,bk+k) = cache1[B*i + k] - cache2[B*j + k];
         }
    return _res;
}
Run Code Online (Sandbox Code Playgroud)

令人惊讶的是,这是迄今为止最慢的方法(每个函数调用 258 毫秒)。

总而言之,尽管对一些优化的 C++ 代码做了一些努力,但我无法接近 numpy 毫不费力地实现的 88 毫秒/调用。知道为什么吗?

注意:顺便说一下,我禁用了numpy多线程,无论如何,这种操作不是多线程的。

编辑:用于对 numpy 代码进行基准测试的确切代码:

import numpy as np

def pairwise_sub_numpy( X ):
    return X - X[:, None, :]

N = 512
X = np.random.rand(N,N).astype(np.float32)

import timeit
times = timeit.repeat('pairwise_sub_numpy( X )', globals=globals(), number=1, repeat=5)
print(f">> best of 5 = {1000*min(times):.3f} ms")
Run Code Online (Sandbox Code Playgroud)

C 代码的完整基准:

#include <stdio.h>
#include <string.h>
#include <xmmintrin.h>   // compile with -mavx -msse4.1
#include <pmmintrin.h>
#include <immintrin.h>
#include <time.h>


#define X(i, j)     _x[(i)*N + (j)]
#define res(i, j, k)  _res[((i)*N + (j))*N + (k)]

float* pairwise_sub_naive( const float* _x, int N ) 
{
    float* _res = (float*) aligned_alloc( 32, N*N*N*sizeof(float));

    for (int i = 0; i < N; i++) {
        for (int j = 0; j < N; j++) {
            for (int k = 0; k < N; k++)
                res(i,j,k) = X(i,k) - X(j,k);
          }
    }
    return _res;
}

float* pairwise_sub_better( const float* _x, int N ) 
{
    float* _res = (float*) aligned_alloc( 32, N*N*N*sizeof(float));
    
    for (int i = 0; i < N; i++) {
        const float* xi = & X(i,0);

        for (int j = 0; j < N; j++) {
            const float* xj = & X(j,0);
            
            float* r = &res(i,j,0);
            for (int k = 0; k < N; k++)
                 r[k] = xi[k] - xj[k];
        }
    }
    return _res;
}

float* pairwise_sub_simd( const float* _x, int N ) 
{
    float* _res = (float*) aligned_alloc( 32, N*N*N*sizeof(float));

    // create caches for row vectors which are memory-aligned
    float* xi = (float*)aligned_alloc(32, N * sizeof(float));
    float* xj = (float*)aligned_alloc(32, N * sizeof(float));
    
    for (int i = 0; i < N; i++) {
        memcpy(xi, & X(i,0), N*sizeof(float));
        
        for (int j = 0; j < N; j++) {
            memcpy(xj, & X(j,0), N*sizeof(float));
            
            float* r = &res(i,j,0);
            for (int k = 0; k < N; k += 256/sizeof(float)) {
                const __m256 A = _mm256_load_ps(xi+k);
                const __m256 B = _mm256_load_ps(xj+k);
                _mm256_store_ps(r+k, _mm256_sub_ps( A, B ));
            }
        }
    }
    free(xi); 
    free(xj);
    return _res;
}


float* pairwise_sub_blocks( const float* _x, int N ) 
{
    float* _res = (float*) aligned_alloc( 32, N*N*N*sizeof(float));

    #define B 8
    float cache1[B*B], cache2[B*B];

    for (int bi = 0; bi < N; bi+=B)
      for (int bj = 0; bj < N; bj+=B)
        for (int bk = 0; bk < N; bk+=B) {
        
            // load first 8x8 block in the cache
            for (int i = 0; i < B; i++)
              for (int k = 0; k < B; k++)
                cache1[B*i + k] = X(bi+i, bk+k);

            // load second 8x8 block in the cache
            for (int j = 0; j < B; j++)
              for (int k = 0; k < B; k++)
                cache2[B*j + k] = X(bj+j, bk+k);

            // compute local operations on the caches
            for (int i = 0; i < B; i++)
             for (int j = 0; j < B; j++)
              for (int k = 0; k < B; k++)
                 res(bi+i,bj+j,bk+k) = cache1[B*i + k] - cache2[B*j + k];
         }
    return _res;
}

int main() 
{
    const int N = 512;
    float* _x = (float*) malloc( N * N * sizeof(float) );
    for( int i = 0; i < N; i++)
      for( int j = 0; j < N; j++)
        X(i,j) = ((i+j*j+17*i+101) % N) / float(N);

    double best = 9e9;
    for( int i = 0; i < 5; i++)
    {
        struct timespec start, stop;
        clock_gettime(CLOCK_THREAD_CPUTIME_ID, &start);
        
        //float* res = pairwise_sub_naive( _x, N );
        //float* res = pairwise_sub_better( _x, N );
        //float* res = pairwise_sub_simd( _x, N );
        float* res = pairwise_sub_blocks( _x, N );

        clock_gettime(CLOCK_THREAD_CPUTIME_ID, &stop);

        double t = (stop.tv_sec - start.tv_sec) * 1e6 + (stop.tv_nsec - start.tv_nsec) / 1e3;    // in microseconds
        if (t < best) best = t;
        free( res );
    }
    printf("Best of 5 = %f ms\n", best / 1000);

    free( _x );
    return 0;
}
Run Code Online (Sandbox Code Playgroud)

使用 gcc 7.3.0 编译 gcc -Wall -O3 -mavx -msse4.1 -o test_simd test_simd.c

我的机器上的计时摘要:

执行 时间
麻木的 88 毫秒
C++ 幼稚 194 毫秒
C++ 更好 195 毫秒
C++ SIMD 178 毫秒
C++ 阻塞 258 毫秒
C++ 阻塞 (gcc 8.3.1) 217 毫秒

cel*_*vek 31

正如一些评论所指出的,numpy 在其实现中使用 SIMD,并且它不会在计算点分配内存。如果我从您的实现中消除内存分配,在计算之前预先分配所有缓冲区,那么即使使用缩放版本(即没有任何优化的版本),与 numpy 相比,我也会获得更好的时间。

同样在 SIMD 方面以及为什么您的实现没有比缩放器好得多的原因是因为您的内存访问模式对于 SIMD 使用来说并不理想-您执行 memcopy 并从彼此相距很远的位置加载到 SIMD 寄存器中-例如您从第 0 行和第 511 行填充向量,这可能无法与缓存或 SIMD 预取器配合使用。

加载 SIMD 寄存器的方式也存在错误(如果我正确理解了您要计算的内容):256 位 SIMD 寄存器可以加载 8 个单精度浮点数8 * 32 = 256,但在您的循环你通过"256/sizeof(float)"256/4 = 64跳 k ;_x_res是浮点指针,SIMD 内部函数也期望浮点指针作为参数,因此不是每 8 个浮点读取这些行中的所有元素,而是每 64 个浮点读取它们。

可以通过更改访问模式来进一步优化计算,但也可以通过观察您重复一些计算来优化:例如,当以line0作为基础进行迭代时,您计算line0 - line1但在未来某个时间,当以line1作为基础进行迭代时,您需要计算line1 - line0基本上是-(line0 - line1),也就是说,对于line0之后的每一行,可以从以前的计算中重用很多结果。很多时候,SIMD 使用或并行化需要改变访问或推理数据的方式,以提供有意义的改进。

这是我根据您的初始实现所做的第一步,它比 numpy 更快(不要介意 OpenMP 的东西,因为它不是应该如何完成的,我只是想看看它的行为如何尝试幼稚的方式)。

C++
Time scaler version: 55 ms
Time SIMD version: 53 ms
**Time SIMD 2 version: 33 ms**
Time SIMD 3 version: 168 ms
Time OpenMP version: 59 ms

Python numpy
>> best of 5 = 88.794 ms


#include <cstdlib>
#include <xmmintrin.h>   // compile with -mavx -msse4.1
#include <pmmintrin.h>
#include <immintrin.h>

#include <numeric>
#include <algorithm>
#include <chrono>
#include <iostream>
#include <cstring>

using namespace std;

float* pairwise_sub_naive (const float* input, float* output, int n) 
{
    for (int i = 0; i < n; i++) {
        for (int j = 0; j < n; j++) {
            for (int k = 0; k < n; k++)
                output[(i * n + j) * n + k] = input[i * n + k] - input[j * n + k];
          }
    }
    return output;
}

float* pairwise_sub_simd (const float* input, float* output, int n) 
{    
    for (int i = 0; i < n; i++) 
    {
        const int idxi = i * n;
        for (int j = 0; j < n; j++)
        {
            const int idxj = j * n;
            const int outidx = idxi + j;
            for (int k = 0; k < n; k += 8) 
            {
                __m256 A = _mm256_load_ps(input + idxi + k);
                __m256 B = _mm256_load_ps(input + idxj + k);
                _mm256_store_ps(output + outidx * n + k, _mm256_sub_ps( A, B ));
            }
        }
    }
    
    return output;
}

float* pairwise_sub_simd_2 (const float* input, float* output, int n) 
{
    float* line_buffer = (float*) aligned_alloc(32, n * sizeof(float));

    for (int i = 0; i < n; i++) 
    {
        const int idxi = i * n;
        for (int j = 0; j < n; j++)
        {
            const int idxj = j * n;
            const int outidx = idxi + j;
            for (int k = 0; k < n; k += 8) 
            {
                __m256 A = _mm256_load_ps(input + idxi + k);
                __m256 B = _mm256_load_ps(input + idxj + k);
                _mm256_store_ps(line_buffer + k, _mm256_sub_ps( A, B ));
            }
            memcpy(output + outidx * n, line_buffer, n);
        }
    }
    
    return output;
}

float* pairwise_sub_simd_3 (const float* input, float* output, int n) 
{    
    for (int i = 0; i < n; i++) 
    {
        const int idxi = i * n;
        for (int k = 0; k < n; k += 8) 
        {
            __m256 A = _mm256_load_ps(input + idxi + k);
            for (int j = 0; j < n; j++)
            {
                const int idxj = j * n;
                const int outidx = (idxi + j) * n;
                __m256 B = _mm256_load_ps(input + idxj + k);
                _mm256_store_ps(output + outidx + k, _mm256_sub_ps( A, B     ));
             }
        }
    }

    return output;
}

float* pairwise_sub_openmp (const float* input, float* output, int n)
{
    int i, j;
    #pragma omp parallel for private(j)
    for (i = 0; i < n; i++) 
    {
        for (j = 0; j < n; j++)
        {
            const int idxi = i * n; 
            const int idxj = j * n;
            const int outidx = idxi + j;
            for (int k = 0; k < n; k += 8) 
            {
                __m256 A = _mm256_load_ps(input + idxi + k);
                __m256 B = _mm256_load_ps(input + idxj + k);
                _mm256_store_ps(output + outidx * n + k, _mm256_sub_ps( A, B ));
            }
        }
    }
    /*for (i = 0; i < n; i++) 
    {
        for (j = 0; j < n; j++) 
        {
            for (int k = 0; k < n; k++)
            {
                output[(i * n + j) * n + k] = input[i * n + k] - input[j * n + k];
            }
        }
    }*/
    
    return output;
}

int main ()
{
    constexpr size_t n = 512;
    constexpr size_t input_size = n * n;
    constexpr size_t output_size = n * n * n;

    float* input = (float*) aligned_alloc(32, input_size * sizeof(float));
    float* output = (float*) aligned_alloc(32, output_size * sizeof(float));

    float* input_simd = (float*) aligned_alloc(32, input_size * sizeof(float));
    float* output_simd = (float*) aligned_alloc(32, output_size * sizeof(float));

    float* input_par = (float*) aligned_alloc(32, input_size * sizeof(float));
    float* output_par = (float*) aligned_alloc(32, output_size * sizeof(float));

    iota(input, input + input_size, float(0.0));
    fill(output, output + output_size, float(0.0));

    iota(input_simd, input_simd + input_size, float(0.0));
    fill(output_simd, output_simd + output_size, float(0.0));
    
    iota(input_par, input_par + input_size, float(0.0));
    fill(output_par, output_par + output_size, float(0.0));

    std::chrono::milliseconds best_scaler{100000};
    for (int i = 0; i < 5; ++i)
    {
        auto start = chrono::high_resolution_clock::now();
        pairwise_sub_naive(input, output, n);
        auto stop = chrono::high_resolution_clock::now();

        auto duration = chrono::duration_cast<chrono::milliseconds>(stop - start);
        if (duration < best_scaler)
        {
            best_scaler = duration;
        }
    }
    cout << "Time scaler version: " << best_scaler.count() << " ms\n";

    std::chrono::milliseconds best_simd{100000};
for (int i = 0; i < 5; ++i)
{
    auto start = chrono::high_resolution_clock::now();
    pairwise_sub_simd(input_simd, output_simd, n);
    auto stop = chrono::high_resolution_clock::now();

    auto duration = chrono::duration_cast<chrono::milliseconds>(stop - start);
     if (duration < best_simd)
    {
        best_simd = duration;
    }
}
cout << "Time SIMD version: " << best_simd.count() << " ms\n";

std::chrono::milliseconds best_simd_2{100000};
for (int i = 0; i < 5; ++i)
{
    auto start = chrono::high_resolution_clock::now();
    pairwise_sub_simd_2(input_simd, output_simd, n);
    auto stop = chrono::high_resolution_clock::now();

    auto duration = chrono::duration_cast<chrono::milliseconds>(stop - start);
     if (duration < best_simd_2)
    {
        best_simd_2 = duration;
    }
}
cout << "Time SIMD 2 version: " << best_simd_2.count() << " ms\n";

std::chrono::milliseconds best_simd_3{100000};
for (int i = 0; i < 5; ++i)
{
    auto start = chrono::high_resolution_clock::now();
    pairwise_sub_simd_3(input_simd, output_simd, n);
    auto stop = chrono::high_resolution_clock::now();

    auto duration = chrono::duration_cast<chrono::milliseconds>(stop - start);
     if (duration < best_simd_3)
    {
        best_simd_3 = duration;
    }
}
cout << "Time SIMD 3 version: " << best_simd_3.count() << " ms\n";

    std::chrono::milliseconds best_par{100000};
    for (int i = 0; i < 5; ++i)
    {
        auto start = chrono::high_resolution_clock::now();
        pairwise_sub_openmp(input_par, output_par, n);
        auto stop = chrono::high_resolution_clock::now();

        auto duration = chrono::duration_cast<chrono::milliseconds>(stop - start);
         if (duration < best_par)
        {
            best_par = duration;
        }
    }
    cout << "Time OpenMP version: " << best_par.count() << " ms\n";

    cout << "Verification\n";
    if (equal(output, output + output_size, output_simd))
    {
        cout << "PASSED\n";
    }
    else
    {
        cout << "FAILED\n";
    }

    return 0;
}
Run Code Online (Sandbox Code Playgroud)

编辑:小的更正,因为存在与 SIMD 实现的第二个版本相关的错误调用。

正如您现在所看到的,第二个实现是最快的,因为从缓存引用的局部性的角度来看,它的表现最好。SIMD 实现的示例 2 和 3 向您说明如何更改内存访问模式以影响 SIMD 优化的性能。总而言之(知道我的建议还远未完成)请注意您的内存访问模式以及从 SIMD 单元加载和存储的内容;SIMD 是处理器核心内的不同硬件单元,因此来回混洗数据会受到惩罚,因此当您从内存加载寄存器时,请尝试对该数据执行尽可能多的操作,并且不要太急于存储它回来了(当然,在你的例子中,这可能就是你需要对数据做的所有事情)。还请注意,可用的 SIMD 寄存器数量有限,如果加载太多,它们就会“溢出”,也就是说,它们将在幕后存储回主内存中的临时位置,从而扼杀您的所有收益。SIMD 优化,这是一个真正的平衡行为!

有一些努力将跨平台内在包装器放入标准中(我在辉煌的过去为自己开发了一个封闭源代码),即使它远未完成,也值得一看(如果您愿意,请阅读随附的论文)真正有兴趣了解 SIMD 的工作原理)。 https://github.com/VcDevel/std-simd


jer*_*111 9

这是对@celakev 发布的答案的补充。我想我终于明白到底是什么问题了。问题在于在执行计算的主函数中分配内存。

真正需要时间的是访问新的(新鲜的)内存。我相信malloc调用返回的内存页是虚拟的,即不对应于实际物理内存——直到它被显式访问。实际上需要时间的是在函数代码中访问物理内存时动态分配物理内存的过程(我认为这是操作系统级别)。

这是一个证明。考虑以下两个微不足道的函数:

#include <stdio.h>
#include <stdlib.h>
#include <time.h>

float* just_alloc( size_t N ) 
{
    return (float*) aligned_alloc( 32, sizeof(float)*N );
}

void just_fill( float* _arr, size_t N ) 
{
    for (size_t i = 0; i < N; i++)
        _arr[i] = 1;
}

#define Time( code_to_benchmark, cleanup_code ) \
    do { \
        double best = 9e9; \
        for( int i = 0; i < 5; i++) { \
            struct timespec start, stop; \
            clock_gettime(CLOCK_THREAD_CPUTIME_ID, &start); \
            code_to_benchmark; \
            clock_gettime(CLOCK_THREAD_CPUTIME_ID, &stop); \
            double t = (stop.tv_sec - start.tv_sec) * 1e3 + (stop.tv_nsec - start.tv_nsec) / 1e6; \
            printf("Time[%d] = %f ms\n", i, t); \
            if (t < best) best = t; \
            cleanup_code; \
        } \
        printf("Best of 5 for '" #code_to_benchmark "' = %f ms\n\n", best); \
    } while(0)

int main() 
{
    const size_t N = 512;

    Time( float* arr = just_alloc(N*N*N), free(arr) );
    
    float* arr = just_alloc(N*N*N);
    Time( just_fill(arr, N*N*N), ; );
    free(arr);

    return 0;
}
Run Code Online (Sandbox Code Playgroud)

我得到以下时间,我现在详细说明每个调用:

Time[0] = 0.000931 ms
Time[1] = 0.000540 ms
Time[2] = 0.000523 ms
Time[3] = 0.000524 ms
Time[4] = 0.000521 ms
Best of 5 for 'float* arr = just_alloc(N*N*N)' = 0.000521 ms

Time[0] = 189.822237 ms
Time[1] = 45.041083 ms
Time[2] = 46.331428 ms
Time[3] = 44.729433 ms
Time[4] = 42.241279 ms
Best of 5 for 'just_fill(arr, N*N*N)' = 42.241279 ms

Run Code Online (Sandbox Code Playgroud)

如您所见,分配内存非常快,但第一次访问内存时,它比其他时间慢 5 倍。所以,基本上我的代码很慢的原因是因为我每次都重新分配没有物理地址的新内存。(如果我错了,请纠正我,但我认为这就是要点!)

  • “所以,基本上我的代码速度慢的原因是因为我每次都重新分配还没有物理地址的新内存。(如果我错了,请纠正我,但我认为这就是它的要点!)”所以它是关于最后分配内存:) (3认同)