执行矢量c ++的交集

Ste*_*ner 6 c++ vector c++11

我有200个存储在vecOfVec中的大小为1到4000000的向量.我需要将这些向量与大小为9000+元素的单个向量"vecSearched"相交.我尝试使用以下代码执行相同操作,但是使用perf工具我发现我正在做的交叉点是我的代码中的瓶颈.我是否有某种方式可以执行有效的交叉路口

#include <cstdlib>
#include <iostream>
#include <vector>

using namespace std;

int main(int argc, char** argv) {
  vector<vector<unsigned> > vecOfVec; //contains 120 vectors of size ranging from 1 to 2000000 elements. All vectors in vecOfVec are sorted
  vector<unsigned> vecSearched; //vector searched in contains 9000+ elements. Vectors in vecSearched are sorted
  for(unsigned kbt=0; kbt<vecOfVec.size(); kbt++)
  {                                  
     //first find first 9 values spaced at equi-distant places, use these 9 values for performing comparisons
     vector<unsigned> equiSpacedVec;                             

     if(((vecSearched[0]))>vecOfVec[kbt][(vecOfVec[kbt].size())-1]) //if beginning of searched vector > last value present in individual vectors of vecOfVec then continue
     {
         continue;                             
     }                         

     unsigned elementIndex=0; //used for iterating over equiSpacedVec                             
     unsigned i=0; //used for iterating over individual buckets vecOfVec[kbt].second
     //search for value in bucket and store it in bucketValPos
     bool firstRun=true;             
     for(vector<unsigned>::iterator itValPos=vecSearched.begin();itValPos!=vecSearched.end();++itValPos)
     {
         //construct a summarized vector out of individual vectors of vecOfVec
         if(firstRun)
         {
             firstRun=false;
             unsigned elementIndex1=0; //used for iterating over equiSpacedVec
             while(elementIndex1<(vecOfVec[kbt].size())) //create a small vector for skipping over the remaining vectors
             {
                  if((elementIndex1+(10000))<(vecOfVec[kbt].size()))
                     elementIndex1+=10000;
                  else
                     break;
                 equiSpacedVec.push_back(vecOfVec[kbt][elementIndex1]);
             }          
         }
         //skip individual vectors of vecOfVec using summarized vector constructed above
         while((!(equiSpacedVec.empty()))&&(equiSpacedVec.size()>(elementIndex+1))&&((*itValPos)>equiSpacedVec[elementIndex+1])){
             elementIndex+=1;
             if((i+100)<(vecOfVec[kbt].size()))
                 i+=100;
         }            
         unsigned j=i;
         while(((*itValPos)>vecOfVec[kbt][j])&&(j<vecOfVec[kbt].size())){
             j++;
         }
         if(j>(vecOfVec[kbt].size()-1)) //element not found even at last position.
         {
             break;
         }              

         if((*itValPos)==vecOfVec[kbt][j])
         {                                     
             //store intersection result
         }
     }
  }
    return 0;
}
Run Code Online (Sandbox Code Playgroud)

Nik*_*iou 5

你的问题非常受欢迎.由于没有关联向量的数据,因此可以归结为加速两个向量之间的交集,基本上有两种方法:

1.没有任何预处理

这通常由三件事来解决:

  • 重复比较次数.例如,对于小向量(大小为1到50),您应该二元搜索每个元素以避免遍历主题向量的所有9000+元素.

  • 提高代码质量以减少分支错误预测,例如观察结果集通常比输入集更小的大小可以转换这样的代码:

    while (Apos < Aend && Bpos < Bend) {
        if (A[Apos] == B[Bpos]) {
            C[Cpos++] = A[Apos];
            Apos++; Bpos++;
        }
        else if (A[Apos] > B[Bpos]) {
            Bpos++;
        } 
        else {
          Apos++;
        } 
    }
    
    Run Code Online (Sandbox Code Playgroud)

    代码"展开"这样的比较创建虽然更容易预测分支(例如块大小= 2):

    while (1) {
        Adat0 = A[Apos]; Adat1 = A[Apos + 1]; 
        Bdat0 = B[Bpos]; Bdat1 = B[Bpos + 1];
        if (Adat0 == Bdat0) { 
            C[Cpos++] = Adat0;
        }
        else if (Adat0 == Bdat1) {
            C[Cpos++] = Adat0;
            goto advanceB;
        }
        else if (Adat1 == Bdat0) {
            C[Cpos++] = Adat1;
            goto advanceA;
        }
        if (Adat1 == Bdat1) {
            C[Cpos++] = Adat1;
            goto advanceAB;
        }
        else if (Adat1 > Bdat1) goto advanceB;
        else goto advanceA;
    advanceA:
        Apos+=2;
        if (Apos >= Aend) { break; } else { continue; }
    advanceB:
        Bpos+=2;
        if (Bpos >= Bend) { break; } else { continue; }
    advanceAB:
        Apos+=2;  Bpos+=2;
        if (Apos >= Aend || Bpos >= Bend) { break; }
    }
    //  fall back to naive algorithm for remaining elements
    
    Run Code Online (Sandbox Code Playgroud)
  • 使用SIMD指令执行块操作

这些技术很难在质量保证方面来描述,但你可以看到他们(加上相关的优化,如if conversion)在这里这里或发现实施的元素在这里

2.进行预处理

这个恕我直言是一个更好的方式,因为你有一个大小9000+元素的单一主题向量.您可以从中创建一个间隔树,或者只是找到一种索引它的方法,例如,创建一个可以加快搜索速度的结构:

vector<unsigned> subject(9000+); 
vector<range>    index(9000/M); 
Run Code Online (Sandbox Code Playgroud)

范围是一个类似的结构

struct range {
    unsigned min, max; 
}; 
Run Code Online (Sandbox Code Playgroud)

从而创建一系列范围

[0, 100], [101, 857], ... [33221, 33500]
Run Code Online (Sandbox Code Playgroud)

这将允许在进行交集时跳过许多比较(例如,如果另一组的元素大于子范围的最大值,则可以完全跳过该子范围)

3.并行化

是的,在两个列表中总是有第三个元素:P.如果您已经对程序进行了足够的优化(并且只有那时),请将您的工作分解为块并并行运行.这个问题符合一个令人尴尬的模式,所以200个向量对比1应该定义为"50对1并发4次"

测试,测量,重新设计!!


mar*_*rom 4

如果我正确理解你的代码,那么如果 N 是向量的数量,M 是每个向量内元素的数量,那么你的算法大约是 O(N * M^2)。然后是“桶”策略,它可以稍微改善一些情况,但其效果很难第一眼评估。

我建议您处理排序向量并在排序向量上进行交集。像这样的东西:

vector<vector<unsigned> > vecOfVec;
vector<unsignend> vecSearched ;
for (vector<unsigned> v : vecSearched) // yes, a copy
{
   std::sort(v.begin(), v.end()) ;
   if (vecSearched.empty()) // first run detection
      vSearched = v ; 
   else
   {  // compute intersection of v and vecSearch
      auto vit = v.begin() ;
      auto vend = v.end() ;
      auto sit = vecSearched.begin() ;
      auto send = vecSearched.end() ;
      vector<unsiged> result ;
      while (vit != vend && sit != send)
      {
         if (*vit < *sit)
             vit++ ;
         else if (*vit == *sit)
         {
             result.push_bck(*it) ; 
             ++vit ;
             ++sit ;
         }
         else //  *vit > *sit
            ++sit ;
      }
      vecSearched = result ;
   }
}
Run Code Online (Sandbox Code Playgroud)

代码未经测试,无论如何,其背后的想法是排序向量的交集更容易,因为您可以比较这两个迭代器(vit,sit)并增长指向较小迭代器的迭代器。所以交集在 M 中是线性的,整个复杂度是 O(N * M *log(M)),其中 log(M) 是由于排序引起的