C++中的保持顺序的memcpy

Ken*_*man 8 c++ x86 arm memcpy lock-free

我正在开发一个多核,多线程软件库,我想在其中提供可能跨越多个缓存行的更新顺序保留无锁共享内存对象.

具体来说,假设我有一些高速缓存行大小的对象的向量X:X [0],... X [K]每个占用恰好一个高速缓存行.我按索引顺序写入它们:首先是X [0],然后是X [1],等等.如果线程2读取X [K],它还会看到X [0]的状态是"至少是当前的"正如它看到的X [K]?

从同一个线程,显然我会看到尊重更新顺序的内存语义.但是现在如果某个第二个线程读取X [K]则会出现问题:是否会观察到对X [0] ...... X [K-1]的相应更新?

通过锁定,我们可以获得此保证.但是由于memcpy用于将某些东西复制到向量中,我们失去了这个属性:memcpy有一个POSIX语义,它不保证索引顺序更新或内存顺序更新或任何其他排序.您可以确保在memcpy完成后,已执行整个更新.

我的问题:是否已经有一个保持订单的memcpy具有相似的速度但具有所需的保证?如果没有,可以在没有锁定的情况下实现这样的原语吗?

假设我的目标平台是x86和ARM.

(编者注:最初称英特尔,所以OP可能不关心AMD.)

Pet*_*des 7

您描述的排序要求正是发布/获取语义所提供的.(http://preshing.com/20120913/acquire-and-release-semantics/).

问题是有效保证原子加载/存储的原子性单位在所有x86和某些ARM上最多为8个字节.否则只有4个字节在其他ARM上. (为什么在x86上对自然对齐的变量进行整数赋值?).一些英特尔CPU可能实际上拥有原子32甚至64字节(AVX512)存储,但英特尔和AMD都没有提供任何官方保证.

我们甚至不知道SIMD向量存储是否有保证顺序,因为它们可能会将宽对齐存储分成多个8字节对齐的块.或者即使这些块是单独的原子. 向量加载/存储和收集/分散的每元素原子性? 有充分的理由相信它们是按元素原子的,即使文档不能保证它.

如果拥有大型"对象"对性能至关重要,那么您可以考虑在您关心的特定服务器上测试向量加载/存储原子性,但是只要保证并让编译器使用它就完全靠您自己.(有内在函数.)确保在不同套接字的内核之间进行测试,以捕获SSE指令之类的情况:哪些CPU可以进行原子16B内存操作?由于K10 Opteron上的插座之间的HyperTransport,在8字节边界处撕裂.这可能是一个非常糟糕的主意; 你无法猜测,如果任何微架构条件在极少数情况下可以使宽矢量存储非原子,即使它通常看起来像是原子的.


您可以轻松地为数组元素发布/获取排序
alignas(64) atomic<uint64_t> arr[1024];.
你只需要很好地询问编译器:

copy_to_atomic(std::atomic<uint64_t> *__restrict dst_a, 
                      const uint64_t *__restrict src, size_t len) {
    const uint64_t *endsrc = src+len;
    while (src < src+len) {
        dst_a->store( *src, std::memory_order_release );
        dst_a++; src++;
    }
}
Run Code Online (Sandbox Code Playgroud)

在x86-64上它不会自动矢量化或任何东西,因为编译器不优化原子,并且因为没有文档可以安全地使用向量来存储原子元素数组的连续元素.:(所以这基本上很糟糕. 在Godbolt编译器浏览器上看到它

我会考虑使用volatile __m256i*指针(对齐的加载/存储)和编译器障碍atomic_thread_fence(std::memory_order_release)来阻止编译时重新排序.每个元素的排序/原子性应该没问题(但不能保证).并且绝对不要指望整个32字节是原子的,只是uint64_t在较低uint64_t元素之后写入更高的元素(并且那些存储按此顺序对其他核心可见).


在ARM32上:即使是a的原子存储uint64_t也不是很好.gcc使用ldrexd/ strexdpair(LL/SC),因为显然没有8字节的原子纯存储.(我使用gcc7.2 -O3 -march = armv7-a编译.在AArch32模式下使用armv8-a,存储对是原子的.当然,AArch64也有原子8字节加载/存储.)


您必须避免使用正常的C库memcpy实现. 在x86上,它可以使用弱排序的存储来存储大型副本,允许在它自己的存储之间进行重新排序(但不能用于以后不属于它的存储memcpy,因为这可能会破坏以后的发布存储.)

movnt高速缓存绕过存储在向量循环中,或rep movsb在具有ERMSB功能的CPU上,都可以创建此效果. 英特尔内存模型是否使SFENCE和LFENCE冗余?.

或者memcpy实现可以在进入其主循环之前首先选择最后(部分)向量.

atomic在C和C++中对UB中的非类型进行并发写+读或写+写; 这就是为什么memcpy有这么多自由做任何想做的事情,包括使用弱排序的存储,只要它sfence必要时使用以确保memcpy整体尊重编译器在为以后的mo_release操作发出代码时所期望的顺序.

(即x86的当前C++实现std::atomic假设没有弱排序的存储让他们担心.任何希望他们的NT存储尊重编译器生成的atomic<T>代码的顺序的代码必须使用_mm_sfence().或者如果用手写asm ,sfence指令直接.或者只是使用,xchg如果你想做一个顺序发布商店,并给你的asm功能atomic_thread_fence(mo_seq_cst)也一样.)

  • 关于_ordering_与_atomicity_,这个问题提出了有趣的观点.特别是,OP从不要求原子性:他要求保证当观察到后续存储(如`X [1]`)时,之前存储的位置(如`X [0]`)将至少为_最近_.我相信x86内存订购模型可以保证这一点,即使是宽SIMD加载和存储也是如此.也就是说,我的排序保证不应该(不是?)限制为原子访问.特别是,这似乎可以保证在"一次写入"的情况下,即使是广泛的商店...... (2认同)

Joe*_*ger 1

我发现Peter Cordes对这个问题的回答富有洞察力、详细且非常有帮助。然而,我没有看到他的建议被写入代码中,因此对于需要快速解决 DMA 或无锁算法的有序写入问题的后代和未来的人们,我包含了我根据该答案编写的代码。我在 x64 和 armv7-a 上使用 gcc 4.9 构建它,尽管我只在 x64 上运行并测试了它。

#include <atomic>
#include <stdlib.h>
#include <algorithm> // min

extern "C" {

static void * linear_memcpy_portable(void *__restrict dest, const void *__restrict src, size_t n)
{
   // Align dest if not already aligned
   if ((uintptr_t)dest & sizeof(uint64_t)) {
      uint8_t *__restrict dst8 = reinterpret_cast<uint8_t *__restrict>(dest);
      const uint8_t *__restrict src8 = reinterpret_cast<const uint8_t *__restrict>(src);
      const size_t align_n = std::min(n, (uintptr_t)dest & sizeof(uint64_t));
      const uint8_t * const endsrc8 = static_cast<const uint8_t * const>(src) + align_n;
      while (src8 < endsrc8) {
         *dst8 = *src8;
         atomic_thread_fence(std::memory_order_release);
         dst8++; src8++;
      }
      dest = dst8;
      src = src8;
      n = n - align_n;
   }
   typedef uint64_t __attribute__((may_alias,aligned(1))) aliasing_unaligned_uint64_t;
   uint64_t *__restrict dst64 = static_cast<uint64_t *__restrict>(dest);
   const aliasing_unaligned_uint64_t *__restrict src64 = static_cast<const aliasing_unaligned_uint64_t *__restrict>(src);
   const uint64_t * const endsrc64 = src64 + n / sizeof(uint64_t);
   const uint8_t * const endsrc8 = static_cast<const uint8_t * const>(src) + n;
   while (src64 < endsrc64) {
      *dst64 = *src64;
      atomic_thread_fence(std::memory_order_release);
      dst64++; src64++;
   }
   if (reinterpret_cast<const uint8_t * const>(endsrc64) != endsrc8) {
      uint8_t *__restrict dst8 = reinterpret_cast<uint8_t *__restrict>(dst64);
      const uint8_t *__restrict src8 = reinterpret_cast<const uint8_t *__restrict>(src64);
      while (src8 < endsrc8) {
         *dst8 = *src8;
         atomic_thread_fence(std::memory_order_release);
         dst8++; src8++;
      }
   }
   return dest;
}

#if (_M_AMD64 || __x86_64__)
#include <immintrin.h>
static void * linear_memcpy_avx2(void *dest, const void * src, size_t n) __attribute__((target("avx2")));
static void * linear_memcpy_avx2(void *dest, const void * src, size_t n)
{
   __m256i *__restrict dst256 = static_cast<__m256i *__restrict>(dest);
   const __m256i *__restrict src256 = static_cast<const __m256i *__restrict>(src);
   const __m256i * const endsrc256 = src256 + n / sizeof(__m256i);
   const uint8_t * const endsrc8 = static_cast<const uint8_t *>(src) + n;
   while (src256 < endsrc256) {
      _mm256_storeu_si256(dst256, _mm256_loadu_si256(src256));
      atomic_thread_fence(std::memory_order_release);
      dst256++; src256++;
   }
   if (reinterpret_cast<const uint8_t * const>(endsrc256) != endsrc8)
      linear_memcpy_portable(dst256, src256, endsrc8 - reinterpret_cast<const uint8_t * const>(endsrc256));
   return dest;
}

static void * linear_memcpy_sse2(void *dest, const void * src, size_t n) __attribute__((target("sse2")));
static void * linear_memcpy_sse2(void *dest, const void * src, size_t n)
{
   __m128i *__restrict dst128 = static_cast<__m128i *__restrict>(dest);
   const __m128i *__restrict src128 = static_cast<const __m128i *__restrict>(src);
   const __m128i * const endsrc128 = src128 + n / sizeof(__m128i);
   const uint8_t * const endsrc8 = static_cast<const uint8_t *>(src) + n;
   while (src128 < endsrc128) {
      _mm_storeu_si128(dst128, _mm_loadu_si128(src128));
      atomic_thread_fence(std::memory_order_release);
      dst128++; src128++;
   }
   if (reinterpret_cast<const uint8_t * const>(endsrc128) != endsrc8)
      linear_memcpy_portable(dst128, src128, endsrc8 - reinterpret_cast<const uint8_t * const>(endsrc128));
   return dest;
}

static void *(*resolve_linear_memcpy(void))(void *, const void *, size_t)
{
   __builtin_cpu_init();
   // All x64 targets support a minimum of SSE2
   return __builtin_cpu_supports("avx2") ? linear_memcpy_avx2 : linear_memcpy_sse2;
}
#ifdef __AVX2__
// IF AVX2 is specified to the compiler, alias to the avx2 impl so it can be inlined
void * linear_memcpy(void *, const void *, size_t) __attribute__((alias("linear_memcpy_avx2")));
#else
void * linear_memcpy(void *, const void *, size_t) __attribute__((ifunc("resolve_linear_memcpy")));
#endif
#else
void * linear_memcpy(void *, const void *, size_t) __attribute__((alias("linear_memcpy_portable")));
#endif

} // extern "C"
Run Code Online (Sandbox Code Playgroud)

我欢迎任何有关实施的反馈。:)