gri*_*fin 5 c optimization memory-alignment sse2 avx2
在我的代码中,我必须处理websocket数据包的"取消屏蔽",这实际上意味着对任意长度的未对齐数据进行异或.感谢SO(Websocket数据取消屏蔽/多字节xor)我已经发现如何(希望)使用SSE2/AVX2扩展加速这一点,但现在看一下,在我看来,我对未对齐数据的处理完全是sub -最佳.有没有办法优化我的代码或者至少使它具有相同的性能更简单,或者我的代码是否已经表现最佳?
这是代码的重要部分(对于这个问题,我假设数据总是至少足以运行一次AVX2循环,但同时它最多只能运行几次):
// circular shift left for uint32
int cshiftl_u32(uint32_t num, uint8_t shift) {
return (num << shift) | (num >> (32 - shift));
}
// circular shift right for uint32
int cshiftr_u32(uint32_t num, uint8_t shift) {
return (num >> shift) | (num << (32 - shift));
}
void optimized_xor_32( uint32_t mask, uint8_t *ds, uint8_t *de ) {
if (ds == de) return; // zero data len -> nothing to do
uint8_t maskOffset = 0;
// process single bytes till 4 byte alignment ( <= 3 )
for (; ds < de && ( (uint64_t)ds & (uint64_t)3 ); ds++) {
*ds ^= *((uint8_t *)(&mask) + maskOffset);
maskOffset = (maskOffset + 1) & (uint8_t)3;
}
if (ds == de) return; // done, return
if (maskOffset != 0) { // circular left-shift mask around so it works for other instructions
mask = cshiftl_u32(mask, maskOffset);
maskOffset = 0;
}
// process 4 byte block till 8 byte alignment ( <= 1 )
uint8_t *de32 = (uint8_t *)((uint64_t)de & ~((uint64_t)31));
if ( ds < de32 && ( (uint64_t)de & (uint64_t)7 ) ) {
*(uint32_t *)ds ^= mask; // mask is uint32_t
if (++ds == de) return;
}
// process 8 byte block till 16 byte alignment ( <= 1 )
uint64_t mask64 = mask | (mask << 4);
uint8_t *de64 = (uint8_t *)((uint64_t)de & ~((uint64_t)63));
if ( ds < de64 && ( (uint64_t)ds & (uint64_t)15 ) ) {
*(uint64_t *)ds ^= mask64;
if (++ds == de) return; // done, return
}
// process 16 byte block till 32 byte alignment ( <= 1) (if supported)
#ifdef CPU_SSE2
__m128i v128, v128_mask;
v128_mask = _mm_set1_epi32(mask);
uint8_t *de128 = (uint8_t *)((uint64_t)de & ~((uint64_t)127));
if ( ds < de128 && ( (uint64_t)ds & (uint64_t)31 ) ) {
v128 = _mm_load_si128((__m128i *)ds);
v128 = _mm_xor_si128(v128, v128_mask);
_mm_store_si128((__m128i *)ds, v128);
if (++ds == de) return; // done, return
}
#endif
#ifdef CPU_AVX2 // process 32 byte blocks (if supported -> haswell upwards)
__m256i v256, v256_mask;
v256_mask = _mm256_set1_epi32(mask);
uint8_t *de256 = (uint8_t *)((uint64_t)de & ~((uint64_t)255));
for (; ds < de256; ds+=32) {
v256 = _mm256_load_si256((__m256i *)ds);
v256 = _mm256_xor_si256(v256, v256_mask);
_mm256_store_si256((__m256i *)ds, v256);
}
if (ds == de) return; // done, return
#endif
#ifdef CPU_SSE2 // process remaining 16 byte blocks (if supported)
for (; ds < de128; ds+=16) {
v128 = _mm_load_si128((__m128i *)ds);
v128 = _mm_xor_si128(v128, v128_mask);
_mm_store_si128((__m128i *)ds, v128);
}
if (ds == de) return; // done, return
#endif
// process remaining 8 byte blocks
// this should always be supported, so remaining can be assumed to be executed <= 1 times
for (; ds < de64; ds += 8) {
*(uint64_t *)ds ^= mask64;
}
if (ds == de) return; // done, return
// process remaining 4 byte blocks ( <= 1)
if (ds < de32) {
*(uint32_t *)ds ^= mask;
if (++ds == de) return; // done, return
}
// process remaining bytes ( <= 3)
for (; ds < de; ds ++) {
*ds ^= *((uint8_t *)(&mask) + maskOffset);
maskOffset = (maskOffset + 1) & (uint8_t)3;
}
}
Run Code Online (Sandbox Code Playgroud)
PS:请忽略使用#ifdef而不是cpuid等来进行cpu标志检测.
与手册中所说的不同,大多数英特尔处理器实际上非常擅长处理未对齐的数据。由于您使用英特尔的编译器内置函数进行向量处理,我假设您可以访问最新版本的icc.
如果您不能自然地对齐数据,那么恐怕您正在做的事情已经接近达到最大性能了。为了使代码在 Xeon Phi(64 字节向量寄存器)/未来更长的向量处理器上更具可读性和可部署性,我建议您开始使用Intel Cilk Plus。
例子:
void intel_cilk_xor(uint32_t mask, uint8_t *d, size_t length) {
while (length & 0x3) {
*(d++) ^= mask;
asm ("rold $8, %0" : "+g" (mask) :: "cc"); // rotate dword one byte left
length--;
}
// switch to 4 bytes per block
uint32_t _d = d;
length >>= 2;
// Intel Cilk Plus Array Notation
// Should expand automatically to the best possible SIMD instructions
// you are compiling for
_d[0:length] ^= mask;
}
Run Code Online (Sandbox Code Playgroud)
请注意,我没有测试此代码,因为我现在无法访问英特尔编译器。如果您遇到问题,那么我下周回到办公室时可以解决它。
如果您更喜欢内在函数,那么正确使用预处理器宏可以显着简化您的生活:
#if defined(__MIC__)
// intel Xeon Phi
#define VECTOR_BLOCKSIZE 64
// I do not remember the correct types/instructions right now
#error "TODO: MIC handling"
#elif defined(CPU_AVX2)
#define VECTOR_BLOCKSIZE 32
typedef __m256i my_vector_t;
#define VECTOR_LOAD_MASK _mm256_set1_epi32
#define VECTOR_XOR(d, mask) _mm_store_si256(d, _mm256_set1_epi32(_mm256_load_si256(d), mask))
#elif defined(CPU_SSE2)
#define VECTOR_BLOCKSIZE 16
typedef __m128i my_vector_t;
#define VECTOR_LOAD_MASK _mm128_set1_epi32
#define VECTOR_XOR(d, mask) _mm_store_si128(d, _mm128_set1_epi32(_mm128_load_si128(d), mask))
#else
#define VECTOR_BLOCKSIZE 8
#define VECTOR_LOAD_MASK(mask) ((mask) << 32 | (mask))
#define VECTOR_XOR(d, mask) (*(d)) ^= (mask)
typedef uint64_t my_vector_t;
#fi
void optimized_xor_32( uint32_t mask, uint8_t *d, size_t length ) {
size_t i;
// there really is no point in having extra
// branches for different vector lengths if they are
// executed at most once
// branch prediction is your friend here
// so we do one byte at a time until the block size
// is reached
while (length && (d & (VECTOR_BLOCKSIZE - 1))) {
*(d++) ^= mask;
asm ("rold $8, %0" : "+g" (mask) :: "cc"); // rotate dword one byte left
length--;
}
my_vector_t * d_vector = (my_vector_t *)d;
my_vector_t vector_mask = VECTOR_LOAD_MASK(mask);
size_t vector_legth = length / VECTOR_BLOCKSIZE; // compiler will optimise this to a bitshift
length &= VECTOR_BLOCKSIZE -1; // remaining length
for (i = 0; i < vector_legth; i++) {
VECTOR_XOR(d_vector + i, vector_mask);
}
// process the tail
d = (uint8_t*)(d_vector + i);
for (i = 0; i < length; i++) {
d[i] ^= mask;
asm ("rold $8, %0" : "+g" (mask) :: "cc");
}
}
Run Code Online (Sandbox Code Playgroud)
另请注意:您可能希望使用 x86 旋转指令而不是移位来进行旋转mask:
#define asm_rol(var, bits) asm ("rol %1, %0" : "+r" (var) : "c" ((uint8_t)bits) : "cc")
Run Code Online (Sandbox Code Playgroud)