WBu*_*uck 2 c++ x86 simd visual-c++ avx2
对于我正在处理的项目,我需要计算翻录PDF图像数据中每列的设置位数。
我正在尝试获取整个PDF作业(所有页面)中每一列的总设置位数。
数据一旦被翻录,就会存储在一个MemoryMappedFile没有后备文件(在内存中)中。
PDF 页面尺寸为 13952 像素 x 15125 像素。生成的翻录数据的总大小可以通过将PDF像素的长度(高度)乘以字节的宽度来计算。翻录的数据是1 bit == 1 pixel. 因此,以字节为单位的翻录页面的大小是(13952 / 8) * 15125.
请注意,宽度始终是 的倍数64 bits。
PDF在被翻录后,我必须计算 a 的每一页(可能是数万页)中每一列的设置位。
我首先用一个基本的解决方案来解决这个问题,即循环遍历每个字节并计算设置位的数量并将结果放在vector. 从那以后,我将算法缩减为如下所示。我的执行时间从 ~350ms 到 ~120ms。
static void count_dots( )
{
    using namespace diag;
    using namespace std::chrono;
    std::vector<std::size_t> dot_counts( 13952, 0 );
    uint64_t* ptr_dot_counts{ dot_counts.data( ) };
    std::vector<uint64_t> ripped_pdf_data( 3297250, 0xFFFFFFFFFFFFFFFFUL );
    const uint64_t* ptr_data{ ripped_pdf_data.data( ) };
    std::size_t line_count{ 0 };
    std::size_t counter{ ripped_pdf_data.size( ) };
    stopwatch sw;
    sw.start( );
    while( counter > 0 )
    {
        *ptr_dot_counts++ += ( ( *ptr_data >> 7 ) & 0x0100000000000000UL ) >> 56;
        *ptr_dot_counts++ += ( ( *ptr_data >> 7 ) & 0x0001000000000000UL ) >> 48;
        *ptr_dot_counts++ += ( ( *ptr_data >> 7 ) & 0x0000010000000000UL ) >> 40;
        *ptr_dot_counts++ += ( ( *ptr_data >> 7 ) & 0x0000000100000000UL ) >> 32;
        *ptr_dot_counts++ += ( ( *ptr_data >> 7 ) & 0x0000000001000000UL ) >> 24;
        *ptr_dot_counts++ += ( ( *ptr_data >> 7 ) & 0x0000000000010000UL ) >> 16;
        *ptr_dot_counts++ += ( ( *ptr_data >> 7 ) & 0x0000000000000100UL ) >> 8;
        *ptr_dot_counts++ += ( ( *ptr_data >> 7 ) & 0x0000000000000001UL ) >> 0;
        *ptr_dot_counts++ += ( ( *ptr_data >> 6 ) & 0x0100000000000000UL ) >> 56;
        *ptr_dot_counts++ += ( ( *ptr_data >> 6 ) & 0x0001000000000000UL ) >> 48;
        *ptr_dot_counts++ += ( ( *ptr_data >> 6 ) & 0x0000010000000000UL ) >> 40;
        *ptr_dot_counts++ += ( ( *ptr_data >> 6 ) & 0x0000000100000000UL ) >> 32;
        *ptr_dot_counts++ += ( ( *ptr_data >> 6 ) & 0x0000000001000000UL ) >> 24;
        *ptr_dot_counts++ += ( ( *ptr_data >> 6 ) & 0x0000000000010000UL ) >> 16;
        *ptr_dot_counts++ += ( ( *ptr_data >> 6 ) & 0x0000000000000100UL ) >> 8;
        *ptr_dot_counts++ += ( ( *ptr_data >> 6 ) & 0x0000000000000001UL ) >> 0;
        *ptr_dot_counts++ += ( ( *ptr_data >> 5 ) & 0x0100000000000000UL ) >> 56;
        *ptr_dot_counts++ += ( ( *ptr_data >> 5 ) & 0x0001000000000000UL ) >> 48;
        *ptr_dot_counts++ += ( ( *ptr_data >> 5 ) & 0x0000010000000000UL ) >> 40;
        *ptr_dot_counts++ += ( ( *ptr_data >> 5 ) & 0x0000000100000000UL ) >> 32;
        *ptr_dot_counts++ += ( ( *ptr_data >> 5 ) & 0x0000000001000000UL ) >> 24;
        *ptr_dot_counts++ += ( ( *ptr_data >> 5 ) & 0x0000000000010000UL ) >> 16;
        *ptr_dot_counts++ += ( ( *ptr_data >> 5 ) & 0x0000000000000100UL ) >> 8;
        *ptr_dot_counts++ += ( ( *ptr_data >> 5 ) & 0x0000000000000001UL ) >> 0;
        *ptr_dot_counts++ += ( ( *ptr_data >> 4 ) & 0x0100000000000000UL ) >> 56;
        *ptr_dot_counts++ += ( ( *ptr_data >> 4 ) & 0x0001000000000000UL ) >> 48;
        *ptr_dot_counts++ += ( ( *ptr_data >> 4 ) & 0x0000010000000000UL ) >> 40;
        *ptr_dot_counts++ += ( ( *ptr_data >> 4 ) & 0x0000000100000000UL ) >> 32;
        *ptr_dot_counts++ += ( ( *ptr_data >> 4 ) & 0x0000000001000000UL ) >> 24;
        *ptr_dot_counts++ += ( ( *ptr_data >> 4 ) & 0x0000000000010000UL ) >> 16;
        *ptr_dot_counts++ += ( ( *ptr_data >> 4 ) & 0x0000000000000100UL ) >> 8;
        *ptr_dot_counts++ += ( ( *ptr_data >> 4 ) & 0x0000000000000001UL ) >> 0;
        *ptr_dot_counts++ += ( ( *ptr_data >> 3 ) & 0x0100000000000000UL ) >> 56;
        *ptr_dot_counts++ += ( ( *ptr_data >> 3 ) & 0x0001000000000000UL ) >> 48;
        *ptr_dot_counts++ += ( ( *ptr_data >> 3 ) & 0x0000010000000000UL ) >> 40;
        *ptr_dot_counts++ += ( ( *ptr_data >> 3 ) & 0x0000000100000000UL ) >> 32;
        *ptr_dot_counts++ += ( ( *ptr_data >> 3 ) & 0x0000000001000000UL ) >> 24;
        *ptr_dot_counts++ += ( ( *ptr_data >> 3 ) & 0x0000000000010000UL ) >> 16;
        *ptr_dot_counts++ += ( ( *ptr_data >> 3 ) & 0x0000000000000100UL ) >> 8;
        *ptr_dot_counts++ += ( ( *ptr_data >> 3 ) & 0x0000000000000001UL ) >> 0;
        *ptr_dot_counts++ += ( ( *ptr_data >> 2 ) & 0x0100000000000000UL ) >> 56;
        *ptr_dot_counts++ += ( ( *ptr_data >> 2 ) & 0x0001000000000000UL ) >> 48;
        *ptr_dot_counts++ += ( ( *ptr_data >> 2 ) & 0x0000010000000000UL ) >> 40;
        *ptr_dot_counts++ += ( ( *ptr_data >> 2 ) & 0x0000000100000000UL ) >> 32;
        *ptr_dot_counts++ += ( ( *ptr_data >> 2 ) & 0x0000000001000000UL ) >> 24;
        *ptr_dot_counts++ += ( ( *ptr_data >> 2 ) & 0x0000000000010000UL ) >> 16;
        *ptr_dot_counts++ += ( ( *ptr_data >> 2 ) & 0x0000000000000100UL ) >> 8;
        *ptr_dot_counts++ += ( ( *ptr_data >> 2 ) & 0x0000000000000001UL ) >> 0;
        *ptr_dot_counts++ += ( ( *ptr_data >> 1 ) & 0x0100000000000000UL ) >> 56;
        *ptr_dot_counts++ += ( ( *ptr_data >> 1 ) & 0x0001000000000000UL ) >> 48;
        *ptr_dot_counts++ += ( ( *ptr_data >> 1 ) & 0x0000010000000000UL ) >> 40;
        *ptr_dot_counts++ += ( ( *ptr_data >> 1 ) & 0x0000000100000000UL ) >> 32;
        *ptr_dot_counts++ += ( ( *ptr_data >> 1 ) & 0x0000000001000000UL ) >> 24;
        *ptr_dot_counts++ += ( ( *ptr_data >> 1 ) & 0x0000000000010000UL ) >> 16;
        *ptr_dot_counts++ += ( ( *ptr_data >> 1 ) & 0x0000000000000100UL ) >> 8;
        *ptr_dot_counts++ += ( ( *ptr_data >> 1 ) & 0x0000000000000001UL ) >> 0;
        *ptr_dot_counts++ += ( ( *ptr_data >> 0 ) & 0x0100000000000000UL ) >> 56;
        *ptr_dot_counts++ += ( ( *ptr_data >> 0 ) & 0x0001000000000000UL ) >> 48;
        *ptr_dot_counts++ += ( ( *ptr_data >> 0 ) & 0x0000010000000000UL ) >> 40;
        *ptr_dot_counts++ += ( ( *ptr_data >> 0 ) & 0x0000000100000000UL ) >> 32;
        *ptr_dot_counts++ += ( ( *ptr_data >> 0 ) & 0x0000000001000000UL ) >> 24;
        *ptr_dot_counts++ += ( ( *ptr_data >> 0 ) & 0x0000000000010000UL ) >> 16;
        *ptr_dot_counts++ += ( ( *ptr_data >> 0 ) & 0x0000000000000100UL ) >> 8;
        *ptr_dot_counts++ += ( ( *ptr_data >> 0 ) & 0x0000000000000001UL ) >> 0;
        ++ptr_data;
        --counter;
        if( ++line_count >= 218 )
        {
            ptr_dot_counts = dot_counts.data( );
            line_count = 0;
        }
    }   
    sw.stop( );
    std::cout << sw.elapsed<milliseconds>( ) << "ms\n";
}
不幸的是,这仍然会增加很多额外的处理时间,这是不可接受的。
上面的代码很丑,不会赢得任何选美比赛,但它有助于减少执行时间。自从我写的原始版本以来,我做了以下工作:
pointers代替indexersuint64而不是uint8for循环遍历每个bit在每byte一个的uint64 bit shift而不是在屏蔽后__popcnt64计算集合bit对于这个测试,我正在生成虚假的翻录数据,其中每个数据都bit设置为1. 本dot_counts vector应包含15125每个element测试完成后。
我希望这里的一些人可以帮助我使算法的平均执行时间低于 100 毫秒。我不在乎这里的便携性。
Xeon E5-2680 v4 - IntelMSVC++ 14.23Windows 10C++17/O2 /arch:AVX2大约 8 年前有人问过一个非常相似的问题: 如何在 Sandy Bridge 上的一系列整数中快速将位计数到单独的 bin 中?
(编者注:也许你错过了在许多 64 位位掩码上分别计算每个位位置,使用 AVX 而不是 AVX2,它有一些更新更快的答案,至少对于在连续内存中沿着一列而不是沿着一行向下移动。也许您可以在一列下移动 1 或 2 个高速缓存行,以便您可以在 SIMD 寄存器中保持计数器的热度。)
当我将迄今为止的内容与公认的答案进行比较时,我非常接近。我已经在处理块uint64而不是uint8. 我只是想知道我是否还能做更多的事情,无论是使用内在函数、汇编还是简单的事情,比如更改我正在使用的数据结构。
正如标记的那样,它可以用 AVX2 来完成。
为了使这项工作正常进行,我建议vector<uint16_t>进行计数。增加计数是最大的问题,我们需要扩大的越多,问题就越大。uint16_t足以计算一页,因此您可以一次计算一页并将计数器添加到一组更宽的计数器中以获得总数。这是一些开销,但比必须在主循环中加宽要少得多。
计数的 big-endian 排序非常烦人,引入了更多的 shuffle 以使其正确。所以我建议把它弄错并稍后重新排列计数(也许是在将它们加总时?)。“先右移 7,然后是 6,然后是 5”的顺序可以免费保持,因为我们可以以任何我们想要的方式选择 64 位块的移位计数。所以在下面的代码中,计数的实际顺序是:
所以每组8个都颠倒了。(至少这是我打算做的,AVX2解包令人困惑)
代码(未测试):
while( counter > 0 )
{
    __m256i data = _mm256_set1_epi64x(*ptr_data);        
    __m256i data1 = _mm256_srlv_epi64(data, _mm256_set_epi64x(4, 6, 5, 7));
    __m256i data2 = _mm256_srlv_epi64(data, _mm256_set_epi64x(0, 2, 1, 3));
    data1 = _mm256_and_si256(data1, _mm256_set1_epi8(1));
    data2 = _mm256_and_si256(data2, _mm256_set1_epi8(1));
    __m256i zero = _mm256_setzero_si256();
    __m256i c = _mm256_loadu_si256((__m256i*)&ptr_dot_counts[0]);
    c = _mm256_add_epi16(_mm256_unpacklo_epi8(data1, zero), c);
    _mm256_storeu_si256((__m256i*)&ptr_dot_counts[0], c);
    c = _mm256_loadu_si256((__m256i*)&ptr_dot_counts[16]);
    c = _mm256_add_epi16(_mm256_unpackhi_epi8(data1, zero), c);
    _mm256_storeu_si256((__m256i*)&ptr_dot_counts[16], c);
    c = _mm256_loadu_si256((__m256i*)&ptr_dot_counts[32]);
    c = _mm256_add_epi16(_mm256_unpacklo_epi8(data2, zero), c);
    _mm256_storeu_si256((__m256i*)&ptr_dot_counts[32], c);
    c = _mm256_loadu_si256((__m256i*)&ptr_dot_counts[48]);
    c = _mm256_add_epi16(_mm256_unpackhi_epi8(data2, zero), c);
    _mm256_storeu_si256((__m256i*)&ptr_dot_counts[48], c);
    ptr_dot_counts += 64;
    ++ptr_data;
    --counter;
    if( ++line_count >= 218 )
    {
        ptr_dot_counts = dot_counts.data( );
        line_count = 0;
    }
}
这可以进一步展开,一次处理多行。这很好,因为正如前面提到的,对计数器求和是最大的问题,按行展开会做的更少,在寄存器中求和更简单。
使用了一些内在函数:
_mm256_set1_epi64x,int64_t将向量的 64 位元素中的1 个复制到所有 4 个元素。也很好uint64_t。_mm256_set_epi64x, 将 4 个 64 位值转换为向量。_mm256_srlv_epi64, 逻辑右移,计数可变(每个元素可以是不同的计数)。_mm256_and_si256,只是按位与。_mm256_add_epi16,此外,适用于 16 位元素。_mm256_unpacklo_epi8并且_mm256_unpackhi_epi8,可能最好通过该页面上的图表来解释可以“垂直”求和,使用一个uint64_t来保存 64 个单独和的所有第 0 位,另一个uint64_t保存和的所有第 1 位等。可以通过模拟全加器(电路组件)来完成加法按位算术。然后,不是只向计数器添加 0 或 1,而是一次性添加更大的数字。
垂直总和也可以矢量化,但这会显着增加将垂直总和添加到列总和的代码,所以我没有在这里这样做。它应该会有所帮助,但这只是大量代码。
示例(未测试):
size_t y;
// sum 7 rows at once
for (y = 0; (y + 6) < 15125; y += 7) {
    ptr_dot_counts = dot_counts.data( );
    ptr_data = ripped_pdf_data.data( ) + y * 218;
    for (size_t x = 0; x < 218; x++) {
        uint64_t dataA = ptr_data[0];
        uint64_t dataB = ptr_data[218];
        uint64_t dataC = ptr_data[218 * 2];
        uint64_t dataD = ptr_data[218 * 3];
        uint64_t dataE = ptr_data[218 * 4];
        uint64_t dataF = ptr_data[218 * 5];
        uint64_t dataG = ptr_data[218 * 6];
        // vertical sums, 7 bits to 3
        uint64_t abc0 = (dataA ^ dataB) ^ dataC;
        uint64_t abc1 = (dataA ^ dataB) & dataC | (dataA & dataB);
        uint64_t def0 = (dataD ^ dataE) ^ dataF;
        uint64_t def1 = (dataD ^ dataE) & dataF | (dataD & dataE);
        uint64_t bit0 = (abc0 ^ def0) ^ dataG;
        uint64_t c1   = (abc0 ^ def0) & dataG | (abc0 & def0);
        uint64_t bit1 = (abc1 ^ def1) ^ c1;
        uint64_t bit2 = (abc1 ^ def1) & c1 | (abc1 & def1);
        // add vertical sums to column counts
        __m256i bit0v = _mm256_set1_epi64x(bit0);
        __m256i data01 = _mm256_srlv_epi64(bit0v, _mm256_set_epi64x(4, 6, 5, 7));
        __m256i data02 = _mm256_srlv_epi64(bit0v, _mm256_set_epi64x(0, 2, 1, 3));
        data01 = _mm256_and_si256(data01, _mm256_set1_epi8(1));
        data02 = _mm256_and_si256(data02, _mm256_set1_epi8(1));
        __m256i bit1v = _mm256_set1_epi64x(bit1);
        __m256i data11 = _mm256_srlv_epi64(bit1v, _mm256_set_epi64x(4, 6, 5, 7));
        __m256i data12 = _mm256_srlv_epi64(bit1v, _mm256_set_epi64x(0, 2, 1, 3));
        data11 = _mm256_and_si256(data11, _mm256_set1_epi8(1));
        data12 = _mm256_and_si256(data12, _mm256_set1_epi8(1));
        data11 = _mm256_add_epi8(data11, data11);
        data12 = _mm256_add_epi8(data12, data12);
        __m256i bit2v = _mm256_set1_epi64x(bit2);
        __m256i data21 = _mm256_srlv_epi64(bit2v, _mm256_set_epi64x(4, 6, 5, 7));
        __m256i data22 = _mm256_srlv_epi64(bit2v, _mm256_set_epi64x(0, 2, 1, 3));
        data21 = _mm256_and_si256(data21, _mm256_set1_epi8(1));
        data22 = _mm256_and_si256(data22, _mm256_set1_epi8(1));
        data21 = _mm256_slli_epi16(data21, 2);
        data22 = _mm256_slli_epi16(data22, 2);
        __m256i data1 = _mm256_add_epi8(_mm256_add_epi8(data01, data11), data21);
        __m256i data2 = _mm256_add_epi8(_mm256_add_epi8(data02, data12), data22);
        __m256i zero = _mm256_setzero_si256();
        __m256i c = _mm256_loadu_si256((__m256i*)&ptr_dot_counts[0]);
        c = _mm256_add_epi16(_mm256_unpacklo_epi8(data1, zero), c);
        _mm256_storeu_si256((__m256i*)&ptr_dot_counts[0], c);
        c = _mm256_loadu_si256((__m256i*)&ptr_dot_counts[16]);
        c = _mm256_add_epi16(_mm256_unpackhi_epi8(data1, zero), c);
        _mm256_storeu_si256((__m256i*)&ptr_dot_counts[16], c);
        c = _mm256_loadu_si256((__m256i*)&ptr_dot_counts[32]);
        c = _mm256_add_epi16(_mm256_unpacklo_epi8(data2, zero), c);
        _mm256_storeu_si256((__m256i*)&ptr_dot_counts[32], c);
        c = _mm256_loadu_si256((__m256i*)&ptr_dot_counts[48]);
        c = _mm256_add_epi16(_mm256_unpackhi_epi8(data2, zero), c);
        _mm256_storeu_si256((__m256i*)&ptr_dot_counts[48], c);
        ptr_dot_counts += 64;
        ++ptr_data;
    }
}
// leftover rows
for (; y < 15125; y++) {
    ptr_dot_counts = dot_counts.data( );
    ptr_data = ripped_pdf_data.data( ) + y * 218;
    for (size_t x = 0; x < 218; x++) {
        __m256i data = _mm256_set1_epi64x(*ptr_data);
        __m256i data1 = _mm256_srlv_epi64(data, _mm256_set_epi64x(4, 6, 5, 7));
        __m256i data2 = _mm256_srlv_epi64(data, _mm256_set_epi64x(0, 2, 1, 3));
        data1 = _mm256_and_si256(data1, _mm256_set1_epi8(1));
        data2 = _mm256_and_si256(data2, _mm256_set1_epi8(1));
        __m256i zero = _mm256_setzero_si256();
        __m256i c = _mm256_loadu_si256((__m256i*)&ptr_dot_counts[0]);
        c = _mm256_add_epi16(_mm256_unpacklo_epi8(data1, zero), c);
        _mm256_storeu_si256((__m256i*)&ptr_dot_counts[0], c);
        c = _mm256_loadu_si256((__m256i*)&ptr_dot_counts[16]);
        c = _mm256_add_epi16(_mm256_unpackhi_epi8(data1, zero), c);
        _mm256_storeu_si256((__m256i*)&ptr_dot_counts[16], c);
        c = _mm256_loadu_si256((__m256i*)&ptr_dot_counts[32]);
        c = _mm256_add_epi16(_mm256_unpacklo_epi8(data2, zero), c);
        _mm256_storeu_si256((__m256i*)&ptr_dot_counts[32], c);
        c = _mm256_loadu_si256((__m256i*)&ptr_dot_counts[48]);
        c = _mm256_add_epi16(_mm256_unpackhi_epi8(data2, zero), c);
        _mm256_storeu_si256((__m256i*)&ptr_dot_counts[48], c);
        ptr_dot_counts += 64;
        ++ptr_data;
    }
}
到目前为止,第二好的方法是一种更简单的方法,更像第一个版本,除了yloopLen一次运行行以利用快速 8 位求和:
size_t yloopLen = 32;
size_t yblock = yloopLen * 1;
size_t yy;
for (yy = 0; yy < 15125; yy += yblock) {
    for (size_t x = 0; x < 218; x++) {
        ptr_data = ripped_pdf_data.data() + x;
        ptr_dot_counts = dot_counts.data() + x * 64;
        __m256i zero = _mm256_setzero_si256();
        __m256i c1 = _mm256_loadu_si256((__m256i*)&ptr_dot_counts[0]);
        __m256i c2 = _mm256_loadu_si256((__m256i*)&ptr_dot_counts[16]);
        __m256i c3 = _mm256_loadu_si256((__m256i*)&ptr_dot_counts[32]);
        __m256i c4 = _mm256_loadu_si256((__m256i*)&ptr_dot_counts[48]);
        size_t end = std::min(yy + yblock, size_t(15125));
        size_t y;
        for (y = yy; y < end; y += yloopLen) {
            size_t len = std::min(size_t(yloopLen), end - y);
            __m256i count1 = zero;
            __m256i count2 = zero;
            for (size_t t = 0; t < len; t++) {
                __m256i data = _mm256_set1_epi64x(ptr_data[(y + t) * 218]);
                __m256i data1 = _mm256_srlv_epi64(data, _mm256_set_epi64x(4, 6, 5, 7));
                __m256i data2 = _mm256_srlv_epi64(data, _mm256_set_epi64x(0, 2, 1, 3));
                data1 = _mm256_and_si256(data1, _mm256_set1_epi8(1));
                data2 = _mm256_and_si256(data2, _mm256_set1_epi8(1));
                count1 = _mm256_add_epi8(count1, data1);
                count2 = _mm256_add_epi8(count2, data2);
            }
            c1 = _mm256_add_epi16(_mm256_unpacklo_epi8(count1, zero), c1);
            c2 = _mm256_add_epi16(_mm256_unpackhi_epi8(count1, zero), c2);
            c3 = _mm256_add_epi16(_mm256_unpacklo_epi8(count2, zero), c3);
            c4 = _mm256_add_epi16(_mm256_unpackhi_epi8(count2, zero), c4);
        }
        _mm256_storeu_si256((__m256i*)&ptr_dot_counts[0], c1);
        _mm256_storeu_si256((__m256i*)&ptr_dot_counts[16], c2);
        _mm256_storeu_si256((__m256i*)&ptr_dot_counts[32], c3);
        _mm256_storeu_si256((__m256i*)&ptr_dot_counts[48], c4);
    }
}
之前有一些测量问题,最后这实际上并没有更好,但也不比上面更高级的“垂直总和”版本差多少。