Luk*_*vec 5 c++ gcc simd vectorization bitmap-index
我有一个相当简单的循环:
auto indexRecord = getRowPointer(0);
bool equals;
// recordCount is about 6 000 000
for (int i = 0; i < recordCount; ++i) {
equals = BitString::equals(SelectMask, indexRecord, maxBytesValue);
rowsFound += equals;
indexRecord += byteSize; // byteSize is 7
}
Run Code Online (Sandbox Code Playgroud)
哪里BitString::equals
:
static inline bool equals(const char * mask, const char * record, uint64_t maxVal) {
return !(((*( uint64_t * ) mask) & (maxVal & *( uint64_t * ) record)) ^ (maxVal & *( uint64_t * ) record));
}
Run Code Online (Sandbox Code Playgroud)
该代码用于模拟数据库中的位图索引查询。我的问题是,是否有一种方法可以矢量化循环,遍历所有记录。当尝试使用 GCC 进行编译时,-fopt-info-vec-missed -O3
我得到:missed: couldn't vectorize loop
.
我对这种优化很陌生,想了解更多,只是感觉我错过了一些东西。
编辑
首先,谢谢大家的回答。我应该包括一个 Reprex。现在就在这里,拥有所需的所有功能,我已经尽可能接近了。所有这些都是在x86-64
平台上完成的,我有 GCC 和 Clang 可用。
#include <iostream>
#include <cstdio>
#include <cstring>
#include <cstdint>
#include <bitset>
#include <ctime>
#include <cstdlib>
constexpr short BYTE_SIZE = 8;
class BitString {
public:
static int getByteSizeFromBits(int bitSize) {
return (bitSize + BYTE_SIZE - 1) / BYTE_SIZE;
}
static void setBitString(char *rec, int bitOffset) {
rec[bitOffset / 8] |= (1 << (bitOffset % BYTE_SIZE));
}
static inline bool equals(const char *mask, const char *record, uint64_t maxVal) {
return !(((*(uint64_t *) mask) & (maxVal & *(uint64_t *) record)) ^ (maxVal & *(uint64_t *) record));
}
};
// Class representing a table schema
class TableSchema {
public:
// number of attributes of a table
unsigned int attrs_count = -1;
// the attribute size in bytes, eg. 3 equals to something like CHAR(3) in SQL
unsigned int *attr_sizes = nullptr;
// max value (domain) of an attribute, -1 for unlimited, ()
int *attr_max_values = nullptr;
// the offset of each attribute, to simplify some pointer arithmetic for further use
unsigned int *attribute_offsets = nullptr;
// sum of attr_sizes if the record size;
unsigned int record_size = -1;
void calculate_offsets() {
if (attrs_count <= 0 || attribute_offsets != nullptr) {
return;
}
attribute_offsets = new unsigned int[attrs_count];
int offset = 0;
for (int i = 0; i < attrs_count; ++i) {
attribute_offsets[i] = offset;
offset += attr_sizes[i];
}
record_size = offset;
}
TableSchema() = default;
~TableSchema() {
if (attribute_offsets != nullptr) {
delete[] attribute_offsets;
attribute_offsets = nullptr;
}
attrs_count = -1;
}
};
class BitmapIndex {
private:
char *mData = nullptr;
short bitSize = 0;
int byteSize = 0;
int attrsCount = 0;
int *attrsMaxValue = nullptr;
int *bitIndexAttributeOffset = nullptr;
unsigned int recordCount = 0;
char *SelectMask;
unsigned int capacity = 0;
inline char *getRowPointer(unsigned int rowId) const {
return mData + rowId * byteSize;
}
inline bool shouldColBeIndexed(int max_col_value) const {
return max_col_value > 0;
}
public:
BitmapIndex(const int *attrs_max_value, int attrs_count, unsigned int capacity) {
auto maxValuesSum = 0;
attrsMaxValue = new int[attrs_count];
attrsCount = attrs_count;
bitIndexAttributeOffset = new int[attrs_count];
auto bitOffset = 0;
// attribute's max value is the same as number of bits used to encode the current value
// e.g., if attribute's max value is 3, we use 001 to represent value 1, 010 for 2, 100 for 3 and so on
for (int i = 0; i < attrs_count; ++i) {
attrsMaxValue[i] = attrs_max_value[i];
bitIndexAttributeOffset[i] = bitOffset;
// col is indexed only if it's max value is > 0, -1 means
if (!shouldColBeIndexed(attrs_max_value[i]))
continue;
maxValuesSum += attrs_max_value[i];
bitOffset += attrs_max_value[i];
}
bitSize = (short) maxValuesSum;
byteSize = BitString::getByteSizeFromBits(bitSize);
mData = new char[byteSize * capacity];
memset(mData, 0, byteSize * capacity);
SelectMask = new char[byteSize];
this->capacity = capacity;
}
~BitmapIndex() {
if (mData != nullptr) {
delete[] mData;
mData = nullptr;
delete[] attrsMaxValue;
attrsMaxValue = nullptr;
delete[] SelectMask;
SelectMask = nullptr;
}
}
unsigned long getTotalByteSize() const {
return byteSize * capacity;
}
// add record to index
void addRecord(const char * record, const unsigned int * attribute_sizes) {
auto indexRecord = getRowPointer(recordCount);
unsigned int offset = 0;
for (int j = 0; j < attrsCount; ++j) {
if (attrsMaxValue[j] != -1) {
// byte col value
char colValue = *(record + offset);
if (colValue > attrsMaxValue[j]) {
throw std::runtime_error("Col value is bigger than max allowed value!");
}
// printf("%d ", colValue);
BitString::setBitString(indexRecord, bitIndexAttributeOffset[j] + colValue);
}
offset += attribute_sizes[j];
}
recordCount += 1;
}
// SELECT COUNT(*)
int Select(const char *query) const {
uint64_t rowsFound = 0;
memset(SelectMask, 0, byteSize);
for (int col = 0; col < attrsCount; ++col) {
if (!shouldColBeIndexed(attrsMaxValue[col])) {
continue;
}
auto col_value = query[col];
if (col_value < 0) {
for (int i = 0; i < attrsMaxValue[col]; ++i) {
BitString::setBitString(SelectMask, bitIndexAttributeOffset[col] + i);
}
} else {
BitString::setBitString(SelectMask, bitIndexAttributeOffset[col] + col_value);
}
}
uint64_t maxBytesValue = 0;
uint64_t byteVals = 0xff;
for (int i = 0; i < byteSize; ++i) {
maxBytesValue |= byteVals << (i * 8);
}
auto indexRecord = getRowPointer(0);
for (int i = 0; i < recordCount; ++i) {
rowsFound += BitString::equals(SelectMask, indexRecord, maxBytesValue);
indexRecord += byteSize;
}
return rowsFound;
}
};
void generateRecord(
char *record,
const unsigned int attr_sizes[],
const int attr_max_value[],
int attr_count
) {
auto offset = 0;
for (int c = 0; c < attr_count; ++c) {
if (attr_max_value[c] == -1) {
for (int j = 0; j < attr_sizes[c]; ++j) {
record[offset + j] = rand() % 256;
}
} else {
for (int j = 0; j < attr_sizes[c]; ++j) {
record[offset + j] = rand() % attr_max_value[c];
}
}
offset += attr_sizes[c];
}
}
int main() {
TableSchema schema;
const int attribute_count = 13;
const int record_count = 1000000;
// for simplicity sake, attr_max_value > 0 is set only for attributes, which size is 1.
unsigned int attr_sizes[attribute_count] = {1, 5, 1, 5, 1, 1, 1, 6, 1, 1, 1, 11, 1};
int attr_max_values[attribute_count] = {3, -1, 4, -1, 6, 5, 7, -1, 7, 6, 5, -1, 8};
schema.attrs_count = attribute_count;
schema.attr_sizes = attr_sizes;
schema.attr_max_values = attr_max_values;
schema.calculate_offsets();
srand((unsigned ) time(nullptr));
BitmapIndex bitmapIndex(attr_max_values, attribute_count, record_count);
char *record = new char[schema.record_size];
for (int i = 0; i < record_count; ++i) {
// generate some random records and add them to the index
generateRecord(record, attr_sizes, attr_max_values, attribute_count);
bitmapIndex.addRecord(record, attr_sizes);
}
char query[attribute_count] = {-1, -1, 0, -1, -1, 3, 2, -1, 3, 3, 4, -1, 6};
// simulate Select COUNT(*) WHERE a1 = -1, a2 = -1, a3 = 0, ...
auto found = bitmapIndex.Select(query);
printf("Query found: %d records\n", found);
delete[] record;
return 0;
}
Run Code Online (Sandbox Code Playgroud)
如果记录大小为 8,GCC 和 Clang 都会自动向量化,例如:(希望能够充分代表代码发生的实际上下文)
int count(char * indexRecord, const char * SelectMask, uint64_t maxVal)
{
bool equals;
uint64_t rowsFound = 0;
// some arbitrary number of records
for (int i = 0; i < 1000000; ++i) {
equals = tequals(SelectMask, indexRecord, maxVal);
rowsFound += equals;
indexRecord += 8; // record size padded out to 8
}
return rowsFound;
}
Run Code Online (Sandbox Code Playgroud)
它的重要部分由 GCC 编译,如下所示:
.L4:
vpand ymm0, ymm2, YMMWORD PTR [rdi]
add rdi, 32
vpcmpeqq ymm0, ymm0, ymm3
vpsubq ymm1, ymm1, ymm0
cmp rax, rdi
jne .L4
Run Code Online (Sandbox Code Playgroud)
不错。它使用与我手动使用相同的想法:vpand
带有掩码的数据(按位逻辑的简化),将其与零进行比较,从 4 个计数器中减去比较结果(减去,因为 True 结果用 -1 表示)包装在一个向量中。四个单独的计数在循环后相加。
顺便说一下,请注意我做了rowsFound
一个uint64_t
. 这很重要。如果rowsFound
不是 64 位,那么 Clang 和 GCC 都会非常努力地尽快缩小计数范围,这与好方法完全相反:这会在循环中花费更多指令,并且没有任何好处。如果计数最终打算是 32 位 int,则可以在循环后简单地缩小它的范围,这可能不仅便宜,而且实际上是免费的。
使用 SIMD 内在函数手动编写与该代码等效的东西并不困难,这可以使代码不那么脆弱(它不会基于希望编译器会做正确的事情),但它不适用于非x86 平台不再是了。
如果记录应该是 7 字节,那就是一个更烦人的问题了。GCC 放弃了,Clang 实际上继续进行自动向量化,但这并不好:8 字节加载都是单独完成的,然后将结果放在一个向量中,这都是很大的时间浪费。
当使用 SIMD 内在函数手动执行此操作时,主要问题是将 7 字节记录解包到 qword 通道中。SSE4.1 版本可以使用pshufb
(pshufb
来自 SSSE3,但pcmpeqq
来自 SSE4.1,因此以 SSE4.1 为目标是有意义的)来做到这一点,很简单。AVX2 版本可以在尝试加载的第一个记录之前 2 个字节开始加载,这样 256 位寄存器的两个 128 位半部之间的“分割”就落在两个记录之间。然后vpshufb
,无法将字节从一个 128 位一半移动到另一半,但仍然可以将字节移动到位,因为它们都不需要交叉到另一半。
例如,具有手动矢量化和 7 字节记录的 AVX2 版本可能如下所示。这需要在结尾和开头都进行一些填充,或者在命中最后一条记录之前跳过第一个记录和结尾并单独处理它们。未经测试,但它至少可以让您了解手动矢量化的代码如何工作。
int count(char * indexRecord, uint64_t SelectMask, uint64_t maxVal)
{
__m256i mask = _mm256_set1_epi64x(~SelectMask & maxVal);
__m256i count = _mm256_setzero_si256();
__m256i zero = _mm256_setzero_si256();
__m256i shufmask = _mm256_setr_epi8(2, 3, 4, 5, 6, 7, 8, -1, 9, 10, 11, 12, 13, 14, 15, -1, 0, 1, 2, 3, 4, 5, 6, -1, 7, 8, 9, 10, 11, 12, 13, -1);
for (int i = 0; i < 1000000; ++i) {
__m256i records = _mm256_loadu_si256((__m256i*)(indexRecord - 2));
indexRecord += 7 * 4;
records = _mm256_shuffle_epi8(records, shufmask);
__m256i isZero = _mm256_cmpeq_epi64(_mm256_and_si256(records, mask), zero);
count = _mm256_sub_epi64(count, isZero);
}
__m128i countA = _mm256_castsi256_si128(count);
__m128i countB = _mm256_extracti128_si256(count, 1);
countA = _mm_add_epi64(countA, countB);
return _mm_cvtsi128_si64(countA) + _mm_extract_epi64(countA, 1);
}
Run Code Online (Sandbox Code Playgroud)