当95%的情况下的值为0或1时,对非常大的数组进行随机访问的任何优化?

Joh*_*nAl 132 c++ arrays optimization performance memory-bandwidth

在一个非常大的阵列上是否有任何可能的随机访问优化(我目前使用uint8_t,我问的是什么更好)

uint8_t MyArray[10000000];
Run Code Online (Sandbox Code Playgroud)

当数组中任何位置的值为

  • 95%的情况下为01,
  • 4%的情况下 2 %
  • 在另外1%的情况下,3255之间?

那么,有没有什么比uint8_t用于此的数组更好?它应该尽可能快地以随机顺序循环遍历整个阵列,并且这对RAM带宽非常重,因此当有多个线程同时为不同的阵列执行时,当前整个RAM带宽很快就饱和了.

我问,因为实际上已知除了5%之外几乎所有的值都是0或1时,拥有如此大的数组(10 MB)效率非常低效.因此当数组中所有值的95%时实际上只需要1位而不是8位,这会将内存使用量减少几乎一个数量级.感觉必须有一个更节省内存的解决方案,这将大大减少为此所需的RAM带宽,因此随机访问也会明显更快.

Mat*_*lia 155

想到的一个简单的可能性是为常见情况保留每个值2位的压缩数组,每个值分开4个字节(原始元素索引为24位,实际值为8位,因此(idx << 8) | value))排序数组为其他的.

查找值时,首先在2bpp数组中查找(O(1)); 如果你找到0,1或2,那就是你想要的价值; 如果找到3,则意味着您必须在辅助阵列中查找它.在这里,您将执行二分搜索,以查找左移8的感兴趣的索引(O(log(n),小n,因为这应该是1%),并从4-中提取值字节的东西.

std::vector<uint8_t> main_arr;
std::vector<uint32_t> sec_arr;

uint8_t lookup(unsigned idx) {
    // extract the 2 bits of our interest from the main array
    uint8_t v = (main_arr[idx>>2]>>(2*(idx&3)))&3;
    // usual (likely) case: value between 0 and 2
    if(v != 3) return v;
    // bad case: lookup the index<<8 in the secondary array
    // lower_bound finds the first >=, so we don't need to mask out the value
    auto ptr = std::lower_bound(sec_arr.begin(), sec_arr.end(), idx<<8);
#ifdef _DEBUG
    // some coherency checks
    if(ptr == sec_arr.end()) std::abort();
    if((*ptr >> 8) != idx) std::abort();
#endif
    // extract our 8-bit value from the 32 bit (index, value) thingie
    return (*ptr) & 0xff;
}

void populate(uint8_t *source, size_t size) {
    main_arr.clear(); sec_arr.clear();
    // size the main storage (round up)
    main_arr.resize((size+3)/4);
    for(size_t idx = 0; idx < size; ++idx) {
        uint8_t in = source[idx];
        uint8_t &target = main_arr[idx>>2];
        // if the input doesn't fit, cap to 3 and put in secondary storage
        if(in >= 3) {
            // top 24 bits: index; low 8 bit: value
            sec_arr.push_back((idx << 8) | in);
            in = 3;
        }
        // store in the target according to the position
        target |= in << ((idx & 3)*2);
    }
}
Run Code Online (Sandbox Code Playgroud)

对于像你提出的那样的数组,第一个数组应该是10000000/4 = 2500000字节,第二个数组需要10000000*1%*4 B = 400000字节; 因此2900000字节,即小于原始数组的三分之一,并且最常用的部分全部保存在内存中,这应该适合缓存(它甚至可以适合L3).

如果您需要超过24位寻址,则必须调整"二级存储"; 扩展它的一个简单方法是使用256个元素指针数组来切换索引的前8位,并转发到24位索引排序数组,如上所述.


快速基准

#include <algorithm>
#include <vector>
#include <stdint.h>
#include <chrono>
#include <stdio.h>
#include <math.h>

using namespace std::chrono;

/// XorShift32 generator; extremely fast, 2^32-1 period, way better quality
/// than LCG but fail some test suites
struct XorShift32 {
    /// This stuff allows to use this class wherever a library function
    /// requires a UniformRandomBitGenerator (e.g. std::shuffle)
    typedef uint32_t result_type;
    static uint32_t min() { return 1; }
    static uint32_t max() { return uint32_t(-1); }

    /// PRNG state
    uint32_t y;

    /// Initializes with seed
    XorShift32(uint32_t seed = 0) : y(seed) {
        if(y == 0) y = 2463534242UL;
    }

    /// Returns a value in the range [1, 1<<32)
    uint32_t operator()() {
        y ^= (y<<13);
        y ^= (y>>17);
        y ^= (y<<15);
        return y;
    }

    /// Returns a value in the range [0, limit); this conforms to the RandomFunc
    /// requirements for std::random_shuffle
    uint32_t operator()(uint32_t limit) {
        return (*this)()%limit;
    }
};

struct mean_variance {
    double rmean = 0.;
    double rvariance = 0.;
    int count = 0;

    void operator()(double x) {
        ++count;
        double ormean = rmean;
        rmean     += (x-rmean)/count;
        rvariance += (x-ormean)*(x-rmean);
    }

    double mean()     const { return rmean; }
    double variance() const { return rvariance/(count-1); }
    double stddev()   const { return std::sqrt(variance()); }
};

std::vector<uint8_t> main_arr;
std::vector<uint32_t> sec_arr;

uint8_t lookup(unsigned idx) {
    // extract the 2 bits of our interest from the main array
    uint8_t v = (main_arr[idx>>2]>>(2*(idx&3)))&3;
    // usual (likely) case: value between 0 and 2
    if(v != 3) return v;
    // bad case: lookup the index<<8 in the secondary array
    // lower_bound finds the first >=, so we don't need to mask out the value
    auto ptr = std::lower_bound(sec_arr.begin(), sec_arr.end(), idx<<8);
#ifdef _DEBUG
    // some coherency checks
    if(ptr == sec_arr.end()) std::abort();
    if((*ptr >> 8) != idx) std::abort();
#endif
    // extract our 8-bit value from the 32 bit (index, value) thingie
    return (*ptr) & 0xff;
}

void populate(uint8_t *source, size_t size) {
    main_arr.clear(); sec_arr.clear();
    // size the main storage (round up)
    main_arr.resize((size+3)/4);
    for(size_t idx = 0; idx < size; ++idx) {
        uint8_t in = source[idx];
        uint8_t &target = main_arr[idx>>2];
        // if the input doesn't fit, cap to 3 and put in secondary storage
        if(in >= 3) {
            // top 24 bits: index; low 8 bit: value
            sec_arr.push_back((idx << 8) | in);
            in = 3;
        }
        // store in the target according to the position
        target |= in << ((idx & 3)*2);
    }
}

volatile unsigned out;

int main() {
    XorShift32 xs;
    std::vector<uint8_t> vec;
    int size = 10000000;
    for(int i = 0; i<size; ++i) {
        uint32_t v = xs();
        if(v < 1825361101)      v = 0; // 42.5%
        else if(v < 4080218931) v = 1; // 95.0%
        else if(v < 4252017623) v = 2; // 99.0%
        else {
            while((v & 0xff) < 3) v = xs();
        }
        vec.push_back(v);
    }
    populate(vec.data(), vec.size());
    mean_variance lk_t, arr_t;
    for(int i = 0; i<50; ++i) {
        {
            unsigned o = 0;
            auto beg = high_resolution_clock::now();
            for(int i = 0; i < size; ++i) {
                o += lookup(xs() % size);
            }
            out += o;
            int dur = (high_resolution_clock::now()-beg)/microseconds(1);
            fprintf(stderr, "lookup: %10d µs\n", dur);
            lk_t(dur);
        }
        {
            unsigned o = 0;
            auto beg = high_resolution_clock::now();
            for(int i = 0; i < size; ++i) {
                o += vec[xs() % size];
            }
            out += o;
            int dur = (high_resolution_clock::now()-beg)/microseconds(1);
            fprintf(stderr, "array:  %10d µs\n", dur);
            arr_t(dur);
        }
    }

    fprintf(stderr, " lookup |   ±  |  array  |   ±  | speedup\n");
    printf("%7.0f | %4.0f | %7.0f | %4.0f | %0.2f\n",
            lk_t.mean(), lk_t.stddev(),
            arr_t.mean(), arr_t.stddev(),
            arr_t.mean()/lk_t.mean());
    return 0;
}
Run Code Online (Sandbox Code Playgroud)

(代码和数据总是在我的Bitbucket中更新)

上面的代码填充了一个10M元素数组,其随机数据分布在帖子中指定的OP,初始化我的数据结构,然后:

  • 使用我的数据结构执行10M元素的随机查找
  • 通过原始数组做同样的事情.

(请注意,在顺序查找的情况下,数组总是赢得一个巨大的衡量标准,因为它是您可以执行的最缓存友好的查找)

最后两个块重复50次并定时; 最后,计算并打印每种类型查找的平均值和标准偏差,以及加速(lookup_mean/array_mean).

-O3 -static在Ubuntu 16.04 上使用g ++ 5.4.0(以及一些警告)编译了上面的代码,并在某些机器上运行它; 他们中的大多数都在运行Ubuntu 16.04,一些是较旧的Linux,一些是较新的Linux.在这种情况下,我认为操作系统根本不相关.

            CPU           |  cache   |  lookup (µs)   |     array (µs)  | speedup (x)
Xeon E5-1650 v3 @ 3.50GHz | 15360 KB |  60011 ±  3667 |   29313 ±  2137 | 0.49
Xeon E5-2697 v3 @ 2.60GHz | 35840 KB |  66571 ±  7477 |   33197 ±  3619 | 0.50
Celeron G1610T  @ 2.30GHz |  2048 KB | 172090 ±   629 |  162328 ±   326 | 0.94
Core i3-3220T   @ 2.80GHz |  3072 KB | 111025 ±  5507 |  114415 ±  2528 | 1.03
Core i5-7200U   @ 2.50GHz |  3072 KB |  92447 ±  1494 |   95249 ±  1134 | 1.03
Xeon X3430      @ 2.40GHz |  8192 KB | 111303 ±   936 |  127647 ±  1503 | 1.15
Core i7 920     @ 2.67GHz |  8192 KB | 123161 ± 35113 |  156068 ± 45355 | 1.27
Xeon X5650      @ 2.67GHz | 12288 KB | 106015 ±  5364 |  140335 ±  6739 | 1.32
Core i7 870     @ 2.93GHz |  8192 KB |  77986 ±   429 |  106040 ±  1043 | 1.36
Core i7-6700    @ 3.40GHz |  8192 KB |  47854 ±   573 |   66893 ±  1367 | 1.40
Core i3-4150    @ 3.50GHz |  3072 KB |  76162 ±   983 |  113265 ±   239 | 1.49
Xeon X5650      @ 2.67GHz | 12288 KB | 101384 ±   796 |  152720 ±  2440 | 1.51
Core i7-3770T   @ 2.50GHz |  8192 KB |  69551 ±  1961 |  128929 ±  2631 | 1.85
Run Code Online (Sandbox Code Playgroud)

结果是......混合!

  1. 一般来说,在大多数这些机器上都有某种加速,或者至少它们是相同的.
  2. 阵列真正胜过"智能结构"查找的两种情况是在具有大量缓存而不是特别繁忙的机器上:上面的Xeon E5-1650(15 MB缓存)是夜间构建机器,此时非常空闲; Xeon E5-2697(35 MB高速缓存)是一款用于高性能计算的机器,也适用于空闲时刻.它确实有意义,原始数组完全适合它们的巨大缓存,因此紧凑的数据结构只会增加复杂性.
  3. 在"性能频谱"的另一侧 - 但是阵列再次稍微快一些,那就是为我的NAS提供动力的简陋Celeron; 它有很少的缓存,数组和"智能结构"都不适合它.具有足够小的缓存的其他机器执行类似.
  4. Xeon X5650必须小心谨慎 - 它们是一个非常繁忙的双插槽虚拟机服务器上的虚拟机; 很可能,虽然名义上它具有相当数量的缓存,但在测试期间它会被完全不相关的虚拟机多次抢占.

  • @JohnAl你不需要结构.`uint32_t`会好的.从辅助缓冲区中删除元素显然会将其排序.插入元素可以使用`std :: lower_bound`然后`insert`(而不是追加并重新排序整个事物).更新使全尺寸二级阵列更具吸引力 - 我当然会从那开始. (7认同)
  • @JohnAl因为值是`(idx << 8)+ val`你不必担心值部分 - 只需使用直接比较.**总是*比小于`((idx + 1)<< 8)+ val`并且小于`((idx-1)<< 8)+ val` (6认同)
  • 我只是为了基准测试而给这个+1.很高兴看到关于效率的问题以及多种处理器类型的结果!太好了! (6认同)
  • @JohnAl:如果这可能有用,我添加了一个`populate`函数,它应该根据`lookup`期望的格式填充`main_arr`和`sec_arr`.我实际上没有尝试过,所以不要指望它真的*工作正常:-); 无论如何,它应该给你一般的想法. (3认同)
  • @JohnAI您应该根据您的实际用例进行分析,而不是其他任何内容.白房速度无关紧要. (2认同)

650*_*502 33

另一种选择可能是

  • 检查结果是0,1还是2
  • 如果不做定期查找

换句话说,像:

unsigned char lookup(int index) {
    int code = (bmap[index>>2]>>(2*(index&3)))&3;
    if (code != 3) return code;
    return full_array[index];
}
Run Code Online (Sandbox Code Playgroud)

其中bmap每个元素使用2位,值3表示"其他".

这种结构更新微不足道,使用了25%以上的内存,但只有5%的情况下查找大部分内容.当然,像往常一样,如果这是一个好主意,取决于很多其他条件,所以唯一的答案是试验真实的用法.

  • 我说这是一个很好的折衷方案,可以获得尽可能多的缓存命中(因为减少的结构可以更容易地适应缓存),而不会在随机访问时间上损失太多. (4认同)

Mat*_*son 23

这更像是一个"长篇评论",而不是一个具体的答案

除非您的数据是众所周知的,否则我怀疑任何人都可以直接回答您的问题(我不知道任何与您的描述相符的内容,但是我不知道所有类型的数据模式的所有内容各种用例).稀疏数据是高性能计算中的常见问题,但通常"我们有一个非常大的数组,但只有一些值非零".

对于我认为不是众所周知的模式,没有人会直接知道哪个更好,这取决于细节:随机访问是多么随机 - 是系统访问数据项集群,还是完全随机的,如同统一的随机数发生器.表数据是完全随机的,还是0的序列,然后是1的序列,散布其他值?如果您有相当长的0和1序列,则运行长度编码可以正常工作,但如果您有"0/1的棋盘格",则无法运行.此外,您必须保留一个"起点"表,以便您可以合理快速地前往相关位置.

我很久以前就知道一些大型数据库只是RAM中的一个大表(本例中的电话交换用户数据),其中一个问题是处理器中的高速缓存和页表优化是无用的.呼叫者与最近呼叫某人的呼叫者很少相同,没有任何类型的预加载数据,它只是纯粹随机的.大页表是对该类访问的最佳优化.

在很多情况下,"速度和小尺寸"之间的妥协是你必须在软件工程中选择的事情之一[在其他工程中它不一定是妥协].因此,"为更简单的代码浪费内存"通常是首选.从这个意义上讲,"简单"解决方案很可能更好地提高速度,但如果你对RAM的"更好"使用,那么优化表的大小将为你提供足够的性能和良好的大小改进.有许多不同的方法可以实现这一点 - 如注释中所建议的,存储两个或三个最常见值的2位字段,然后是其他值的一些替代数据格式 - 哈希表将是我的第一种方法,但列表或二叉树也可以工作 - 再次,它取决于你的"不是0,1或2"的模式.同样,它取决于值在表中如何"分散" - 它们是在簇中还是更像是均匀分布的模式?

但问题是你还在从RAM中读取数据.然后,您将花费更多代码处理数据,包括一些代码来应对"这不是一个常见的价值".

最常见的压缩算法的问题是它们基于解包序列,因此您无法随机访问它们.将大数据分成两块(例如256个条目),将256解压缩到uint8_t数组,获取所需数据,然后丢弃未压缩数据的开销很可能不会给你带来好处表现 - 当然,假设它具有一定的重要性.

最后,您可能必须在评论/答案中实施一个或几个想法来测试,看看它是否有助于解决您的问题,或者内存总线仍然是主要的限制因素.

  • @JohnAI:如果您只是在多个线程上运行相同低效进程的多个版本,您将始终看到有限的进度.设计并行处理算法比调整存储结构要大得多. (2认同)

o11*_*11c 13

我过去所做的是在bitset 前面使用hashmap .

与Matteo的答案相比,这个空间减半,但如果"异常"查找很慢(即有很多例外),可能会更慢.

然而,通常,"缓存为王".

  • 与Matteo的回答相比,hashmap如何确定空间?那个hashmap应该是什么? (2认同)
  • @ o11c我不确定我是否理解正确.你的意思是有一个1位数组的数组,其中`0`表示_look表示`main_arr`_而`1`表示_look表示`sec_arr`_(在Matteos代码的情况下)?这需要总体上比Matteos更多的空间,因为它是一个额外的阵列.我不太明白,与Matteos的答案相比,你只会使用一半的空间. (2认同)

Jac*_*ley 11

除非您的数据存在模式,否则不太可能存在任何合理的速度或大小优化,并且 - 假设您的目标是普通计算机 - 无论如何,10 MB都不是那么大.

您的问题有两个假设:

  1. 数据存储不佳,因为您没有使用所有位
  2. 更好地存储会使事情变得更快.

我认为这两种假设都是错误的.在大多数情况下,存储数据的适当方法是存储最自然的表示.在你的情况下,这是你已经去过的那个:一个0到255之间的数字的字节.任何其他表示将更复杂,因此 - 所有其他条件相同 - 更慢,更容易出错.需要偏离这个一般原则,你需要一个比95%的数据可能有6个"浪费"的比特更强的理由.

对于您的第二个假设,如果且仅当更改阵列的大小导致更少的高速缓存未命中时,情况才会成立.是否会发生这种情况只能通过分析工作代码来确定,但我认为这不太可能产生重大影响.因为在任何一种情况下你都将随机访问数组,处理器将很难知道在哪种情况下都要缓存和保留哪些数据位.


sup*_*cat 8

如果数据和访问是均匀随机分布的,则性能可能取决于访问的哪个部分避免外部高速缓存未命中.优化它需要知道什么尺寸的阵列可以可靠地容纳在缓存中.如果你的缓存大到足以容纳每五个单元一个字节,最简单的方法可能是让一个字节保存0到2范围内的五个基三编码值(有5个值的243个组合,所以这将是适合于一个字节),以及10,000,000字节的数组,只要base-3值指示"2",就会查询该数组.

如果缓存不是那么大,但是每8个单元可以容纳一个字节,那么就不可能使用一个字节值来从8个base-3值的所有6,561种可能组合中进行选择,但是因为它的唯一影响是将0或1更改为2将导致否则不必要的查找,正确性将不需要支持所有6,561.相反,人们可以专注于256个最"有用"的值.

特别是如果0比1更常见,反之亦然,一个好的方法可能是使用217个值来编码0和1的组合,其中包含5个或更少的1,16个值来编码xxxx0000到xxxx1111,16来编码0000xxxx到1111xxxx,一个用于xxxxxxxx.对于可能找到的任何其他用途,将保留四个值.如果数据是按照描述随机分布的,那么所有查询中的绝大部分都会命中仅包含0和1的字节(在所有8个组的大约2/3中,所有位都是0和1,大约7/8那些将有六个或更少的1位); 绝大多数没有的人将落入一个包含四个x的字节中,并且有50%的几率落在零或一个上.因此,只有大约四分之一的查询需要进行大型数组查找.

如果数据是随机分布的,但是缓存不够大,无法处理每八个元素一个字节,那么可以尝试使用这种方法,每个字节处理八个以上的项目,但除非存在强烈偏向0或朝向1 ,无需在大数组中进行查找即可处理的值的分数将随着每个字节处理的数量的增加而缩小.


Mar*_*aux 7

我会补充@ o11c的答案,因为他的措辞可能有点令人困惑.如果我需要挤压最后一位和CPU周期,我会执行以下操作.

我们将从构建一个平衡的二叉搜索树开始,该树保存5%的"其他"案例.对于每次查找,您都可以快速遍历树:您有10000000个元素:其中5%在树中:因此树数据结构包含500000个元素.在O(log(n))时间内执行此操作,可以进行19次迭代.我不是这方面的专家,但我想有一些内存高效的实现.让我们猜一下:

  • 平衡树,因此可以计算子树位置(索引不需要存储在树的节点中).堆(数据结构)存储在线性存储器中的方式相同.
  • 1字节值(2到255)
  • 索引的3个字节(10000000需要23位,适合3个字节)

总计,4个字节:500000*4 = 1953 kB.适合缓存!

对于所有其他情况(0或1),您可以使用位向量.请注意,您不能忽略5%的其他随机访问情况:1.19 MB.

这两者的组合使用大约3,099 MB.使用此技术,您将节省3.08的内存因子.

然而,这并没有击败@Matteo Italia(使用2.76 MB)的答案,很可惜.有什么我们可以做的额外吗?占用大部分内存的部分是树中3个字节的索引.如果我们可以将它降低到2,我们将节省488 kB,总内存使用量将是:2.622 MB,这是更小的!

我们如何做到这一点?我们必须将索引减少到2个字节.同样,10000000需要23位.我们需要能够丢弃7位.我们可以通过将10000000个元素的范围划分为78125个元素的2 ^ 7(= 128)个区域来实现.现在我们可以为每个区域构建一个平衡树,平均有3906个元素.选择正确的树是通过将目标索引的简单划分2 ^ 7(或比特移位>> 7)来完成的.现在,存储所需的索引可以用剩余的16位表示.请注意,需要存储的树的长度有一些开销,但这可以忽略不计.还要注意,这种拆分机制减少了遍历树所需的迭代次数,现在减少了7次迭代,因为我们丢弃了7位:只剩下12次迭代.

请注意,理论上你可以重复这个过程来切断接下来的8位,但是这需要你创建2 ^ 15个平衡树,平均有~305个元素.这将导致2.143 MB,只有4次迭代来遍历树,与我们开始的19次迭代相比,这是一个相当大的加速.

最后的结论是:通过一点点的内存使用量来击败2位向量策略,但实现起来却很困难.但是,如果它可以在适应缓存之间产生差异,那么尝试可能是值得的.


Det*_*nar 5

如果您只执行读取操作,最好不将值分配给单个索引,而是分配给索引的间隔.

例如:

[0, 15000] = 0
[15001, 15002] = 153
[15003, 26876] = 2
[25677, 31578] = 0
...
Run Code Online (Sandbox Code Playgroud)

这可以使用结构来完成.如果您喜欢OO方法,您也可能想要定义类似于此的类.

class Interval{
  private:
    uint32_t start; // First element of interval
    uint32_t end; // Last element of interval
    uint8_t value; // Assigned value

  public:
    Interval(uint32_t start, uint32_t end, uint8_t value);
    bool isInInterval(uint32_t item); // Checks if item lies within interval
    uint8_t getValue(); // Returns the assigned value
}
Run Code Online (Sandbox Code Playgroud)

现在你只需要通过一个间隔列表进行迭代,并检查你的索引是否位于其中一个内容中,平均内存密集程度要低得多,但会占用更多的CPU资源.

Interval intervals[INTERVAL_COUNT];
intervals[0] = Interval(0, 15000, 0);
intervals[1] = Interval(15001, 15002, 153);
intervals[2] = Interval(15003, 26876, 2);
intervals[3] = Interval(25677, 31578, 0);
...

uint8_t checkIntervals(uint32_t item)

    for(int i=0; i<INTERVAL_COUNT-1; i++)
    {
        if(intervals[i].isInInterval(item) == true)
        {
            return intervals[i].getValue();
        }
    }
    return DEFAULT_VALUE;
}
Run Code Online (Sandbox Code Playgroud)

如果按降序大小对间隔进行排序,则会增加您要查找的项目的可能性,这会进一步降低平均内存和CPU资源使用量.

您还可以删除大小为1的所有间隔.将相应的值放入地图中,只有在间隔中找不到您要查找的项目时才检查它们.这也应该提高平均性能.

  • 有趣的想法(+1),但我有点怀疑它会证明开销是合理的,除非有很多长期的0和/或1的长期运行.实际上,您建议使用数据的行程编码.在某些情况下它可能是好的,但可能不是解决这个问题的一般方法. (4认同)