计算包含高维向量的两个矩阵之间的最小欧氏距离的最快方法

min*_*oon 6 c++ performance opencv matrix-multiplication eigen

我在另一个主题上开始了类似的问题,但后来我专注于如何使用OpenCV.由于未能实现我原先想要的东西,我会在这里问到我想要的东西.

我有两个矩阵.矩阵a为2782x128,矩阵b为4000x128,均为无符号字符值.值存储在单个数组中.对于a中的每个向量,我需要b中具有最接近的欧氏距离的向量的索引.

好的,现在我的代码实现了这个:

#include <windows.h>
#include <stdlib.h>
#include <stdio.h>
#include <cstdio>
#include <math.h>
#include <time.h>
#include <sys/timeb.h>
#include <iostream>
#include <fstream>
#include "main.h"

using namespace std;

void main(int argc, char* argv[])
{
    int a_size;
    unsigned char* a = NULL;
    read_matrix(&a, a_size,"matrixa");
    int b_size;
    unsigned char* b = NULL;
    read_matrix(&b, b_size,"matrixb");

    LARGE_INTEGER liStart;
    LARGE_INTEGER liEnd;
    LARGE_INTEGER liPerfFreq;
    QueryPerformanceFrequency( &liPerfFreq );
    QueryPerformanceCounter( &liStart );

    int* indexes = NULL;
    min_distance_loop(&indexes, b, b_size, a, a_size);

    QueryPerformanceCounter( &liEnd );

    cout << "loop time: " << (liEnd.QuadPart - liStart.QuadPart) / long double(liPerfFreq.QuadPart) << "s." << endl;

    if (a)
    delete[]a;
if (b)
    delete[]b;
if (indexes)
    delete[]indexes;
    return;
}

void read_matrix(unsigned char** matrix, int& matrix_size, char* matrixPath)
{
    ofstream myfile;
    float f;
    FILE * pFile;
    pFile = fopen (matrixPath,"r");
    fscanf (pFile, "%d", &matrix_size);
    *matrix = new unsigned char[matrix_size*128];

    for (int i=0; i<matrix_size*128; ++i)
    {
        unsigned int matPtr;
        fscanf (pFile, "%u", &matPtr);
        matrix[i]=(unsigned char)matPtr;
    }
    fclose (pFile);
}

void min_distance_loop(int** indexes, unsigned char* b, int b_size, unsigned char* a, int a_size)
{
    const int descrSize = 128;

    *indexes = (int*)malloc(a_size*sizeof(int));
    int dataIndex=0;
    int vocIndex=0;
    int min_distance;
    int distance;
    int multiply;

    unsigned char* dataPtr;
    unsigned char* vocPtr;
    for (int i=0; i<a_size; ++i)
    {
        min_distance = LONG_MAX;
        for (int j=0; j<b_size; ++j)
        {
            distance=0;
            dataPtr = &a[dataIndex];
            vocPtr = &b[vocIndex];

            for (int k=0; k<descrSize; ++k)
            {
                multiply = *dataPtr++-*vocPtr++;
                distance += multiply*multiply;
                // If the distance is greater than the previously calculated, exit
                if (distance>min_distance)
                    break;
            }

            // if distance smaller
            if (distance<min_distance)
            {
                min_distance = distance;
                (*indexes)[i] = j;
            }
            vocIndex+=descrSize;
        }
        dataIndex+=descrSize;
        vocIndex=0;
    }
}
Run Code Online (Sandbox Code Playgroud)

附带的是带有样本矩阵的文件.

matrixa matrixb

我正在使用windows.h来计算消耗时间,所以如果你想在另一个平台上测试代码而不是windows,只需更改windows.h标题并改变计算消耗时间的方式.

我的电脑中的这段代码约为0.5秒.问题是我在Matlab中有另一个代码在0.05秒内完成同样的事情.在我的实验中,我每秒都会收到几个像矩阵一样的矩阵,所以0.5秒就太多了.

现在用matlab代码来计算:

aa=sum(a.*a,2); bb=sum(b.*b,2); ab=a*b'; 
d = sqrt(abs(repmat(aa,[1 size(bb,1)]) + repmat(bb',[size(aa,1) 1]) - 2*ab));
[minz index]=min(d,[],2);
Run Code Online (Sandbox Code Playgroud)

好.Matlab代码使用的是(xa)^ 2 = x ^ 2 + a ^ 2 - 2ab.

所以我的下一次尝试是做同样的事情.我删除了自己的代码进行相同的计算,但是大约是1.2秒.

然后,我尝试使用不同的外部库.第一次尝试是Eigen:

const int descrSize = 128;
MatrixXi a(a_size, descrSize);
MatrixXi b(b_size, descrSize);
MatrixXi ab(a_size, b_size);

unsigned char* dataPtr = matrixa;
for (int i=0; i<nframes; ++i)
{
    for (int j=0; j<descrSize; ++j)
    {
        a(i,j)=(int)*dataPtr++;
    }
}
unsigned char* vocPtr = matrixb;
for (int i=0; i<vocabulary_size; ++i)
{
    for (int j=0; j<descrSize; ++j)
    {
        b(i,j)=(int)*vocPtr ++;
    }
}
ab = a*b.transpose();
a.cwiseProduct(a);
b.cwiseProduct(b);
MatrixXi aa = a.rowwise().sum();
MatrixXi bb = b.rowwise().sum();
MatrixXi d = (aa.replicate(1,vocabulary_size) + bb.transpose().replicate(nframes,1) - 2*ab).cwiseAbs2();

int* index = NULL;
index = (int*)malloc(nframes*sizeof(int));
for (int i=0; i<nframes; ++i)
{
    d.row(i).minCoeff(&index[i]);
}
Run Code Online (Sandbox Code Playgroud)

这个特征代码的成本约为1.2,表示:ab = a*b.transpose();

使用opencv的类似代码也被使用,并且ab = a*b.transpose()的成本; 是0.65秒.

所以,matlab能够如此快速地完成同样的事情并且我无法使用C++真的很烦人!当然能够运行我的实验会很棒,但我认为缺乏知识真的让我烦恼.如何实现至少与Matlab相同的性能?任何类型的溶解都是受欢迎的.我的意思是,任何外部库(如果可能的话免费),循环展开东西,模板东西,SSE intructions(我知道它们存在),缓存东西.正如我所说,我的主要目的是增加我的知识,因为能够以更快的速度编写这样的代码.

提前致谢

编辑:David Hammen建议的更多代码.在进行任何计算之前,我将数组转换为int.这是代码:

void min_distance_loop(int** indexes, unsigned char* b, int b_size, unsigned char* a, int a_size)
{
    const int descrSize = 128;

    int* a_int;
    int* b_int;

    LARGE_INTEGER liStart;
    LARGE_INTEGER liEnd;
    LARGE_INTEGER liPerfFreq;
    QueryPerformanceFrequency( &liPerfFreq );
    QueryPerformanceCounter( &liStart );

    a_int = (int*)malloc(a_size*descrSize*sizeof(int));
    b_int = (int*)malloc(b_size*descrSize*sizeof(int));

    for(int i=0; i<descrSize*a_size; ++i)
        a_int[i]=(int)a[i];
    for(int i=0; i<descrSize*b_size; ++i)
        b_int[i]=(int)b[i];

    QueryPerformanceCounter( &liEnd );

    cout << "Casting time: " << (liEnd.QuadPart - liStart.QuadPart) / long double(liPerfFreq.QuadPart) << "s." << endl;

    *indexes = (int*)malloc(a_size*sizeof(int));
    int dataIndex=0;
    int vocIndex=0;
    int min_distance;
    int distance;
    int multiply;

    /*unsigned char* dataPtr;
    unsigned char* vocPtr;*/
    int* dataPtr;
    int* vocPtr;
    for (int i=0; i<a_size; ++i)
    {
        min_distance = LONG_MAX;
        for (int j=0; j<b_size; ++j)
        {
            distance=0;
            dataPtr = &a_int[dataIndex];
            vocPtr = &b_int[vocIndex];

            for (int k=0; k<descrSize; ++k)
            {
                multiply = *dataPtr++-*vocPtr++;
                distance += multiply*multiply;
                // If the distance is greater than the previously calculated, exit
                if (distance>min_distance)
                    break;
            }

            // if distance smaller
            if (distance<min_distance)
            {
                min_distance = distance;
                (*indexes)[i] = j;
            }
            vocIndex+=descrSize;
        }
        dataIndex+=descrSize;
        vocIndex=0;
    }
}
Run Code Online (Sandbox Code Playgroud)

现在整个过程为0.6,开始时的铸造循环为0.001秒.也许我做错了什么?

EDIT2:关于Eigen的一切?当我寻找外部文库时,他们总是谈论Eigen及其速度.我做错了什么?这里使用Eigen的简单代码显示它不是那么快.也许我错过了一些配置或一些旗帜,或者......

MatrixXd A = MatrixXd::Random(1000, 1000);
MatrixXd B = MatrixXd::Random(1000, 500);
MatrixXd X;
Run Code Online (Sandbox Code Playgroud)

这段代码约为0.9秒.

Dav*_*men 2

在 C++ 代码中肯定会伤害您的一件事是它有大量 char 到 int 的转换。通过船载,我的意思是最多 2*2782*4000*128 字符到 int 的转换。那些char转换int很慢,非常慢。

您可以通过分配一对数组(一个 2782*128 和另一个 4000*128)来将其减少到 (2782+4000)*128 此类转换,以包含和int数组的强制转换为整数内容。使用这些数组而不是您的数组。char* achar* bint*char*

另一个问题可能是您使用intvs long。我不在 Windows 上工作,所以这可能不适用。在我使用的机器上,以前int是 32 位,long现在是 64 位。32 位绰绰有余,因为 255*255*128 < 256*256*128 = 2 23

这显然不是问题所在。

令人惊讶的是,所讨论的代码并没有计算 Matlab 代码创建的巨大的 2728 x 4000 数组。更引人注目的是,Matlab 很可能使用双精度数而不是整数来实现这一点——而且它仍然击败了 C/C++ 代码。

一大问题是缓存。那个 4000*128 数组对于 1 级缓存来说太大了,并且您要迭代这个大数组 2782 次。您的代码在内存上等待太多。要解决此问题,请使用较小的数组块,b以便您的代码尽可能长时间地使用 1 级缓存。

另一个问题是优化if (distance>min_distance) break;。我怀疑这实际上是一种不优化。if在最内层循环中进行测试通常是一个坏主意。尽快遍历该内部产品。除了浪费计算之外,摆脱这个测试没有什么坏处。有时,如果这样做可以删除最内层循环中的分支,那么最好进行明显不需要的计算。这是其中之一。您也许可以通过消除此测试来解决您的问题。尝试这样做。

回到缓存问题,您需要摆脱这个分支,以便可以将ab矩阵上的操作拆分为更小的块,一次不超过 256 行的块。这就是两个现代 Intel 芯片的 L1 缓存之一可以容纳多少行 128 个无符号字符。由于 250 可除 4000,因此请考虑从逻辑上将该b矩阵分为 16 个块。您可能很想形成 2872 x 4000 的大内积数组,但要分成小块进行。您可以将其添加if (distance>min_distance) break;回来,但要在块级别而不是逐字节级别执行此操作。

您应该能够击败 Matlab,因为它几乎肯定可以使用双精度数,但您可以使用无符号字符和整数。