Ring Allocator用于Lockfree更新成员变量?

alf*_*out 6 c++ performance lock-free dynamic-memory-allocation data-structures

我有一个类存储一些传入的实时数据的最新值(大约1.5亿事件/秒).

假设它看起来像这样:

class DataState 
{
    Event latest_event;

  public:
  //pushes event atomically
  void push_event(const Event __restrict__* e);
  //pulls event atomically
  Event pull_event();
};
Run Code Online (Sandbox Code Playgroud)

我需要能够以原子方式推送事件并使用严格的排序保证来拉动它们.现在,我知道我可以使用自旋锁,但鉴于大量事件率(超过1亿/秒)和高度并发性,我更喜欢使用无锁操作.

问题是Event大小为64字节.CMPXCHG64B目前的X86 CPU 没有任何指令(截至2016年8月).因此,如果我使用std::atomic<Event>我必须链接到libatomic使用互联网下的互斥量(太慢).

所以我的解决方案是以原子方式交换指向值的指针.问题是动态内存分配成为这些事件率的瓶颈.所以...我定义了一些我称之为"环分配器"的东西:

/// @brief Lockfree Static short-lived allocator used for a ringbuffer
/// Elements are guaranteed to persist only for "size" calls to get_next()
template<typename T> class RingAllocator {
  T *arena;
  std::atomic_size_t arena_idx;
  const std::size_t arena_size;
 public:
  /// @brief Creates a new RingAllocator
  /// @param size The number of elements in the underlying arena. Make this large enough to avoid overwriting fresh data
  RingAllocator<T>(std::size_t size) : arena_size(size)
  {
  //allocate pool
  arena = new T[size];
  //zero out pool
  std::memset(arena, 0, sizeof(T) * size);
  arena_idx = 0;
  }

  ~RingAllocator()
  {
  delete[] arena;
  }

  /// @brief Return next element's pointer. Thread-safe
  /// @return pointer to next available element
  T *get_next()
  {
      return &arena[arena_idx.exchange(arena_idx++ % arena_size)];
  }
};
Run Code Online (Sandbox Code Playgroud)

然后我可以让我的DataState类看起来像这样:

class DataState 
{
    std::atomic<Event*> latest_event;
    RingAllocator<Event> event_allocator;
  public:
  //pushes event atomically
  void push_event(const Event __restrict__* e)
  {
      //store event
      Event *new_ptr = event_allocator.get_next()
      *new_ptr = *e;
      //swap event pointers
      latest_event.store(new_ptr, std::memory_order_release);
  }
  //pulls event atomically
  Event pull_event()
  {
      return *(latest_event.load(std::memory_order_acquire));
  }
};
Run Code Online (Sandbox Code Playgroud)

只要我将环分配器的大小调整为可以同时调用函数的最大线程数,就不存在覆盖pull_event可能返回的数据的风​​险.加上一切都是超级本地化的,因此间接不会导致错误的缓存性能.这种方法有任何可能的陷阱吗?

Pet*_*des 2

班上DataState

我以为它会是一个堆栈或队列,但事实并非如此,所以push/pull似乎不是一个好的方法名称。(否则实现完全是假的)。

它只是一个锁存器,可让您读取任何线程存储的最后一个事件。

没有什么可以阻止连续两次写入覆盖从未读取过的元素。也没有什么可以阻止您两次阅读同一元素。

如果您只需要在某个地方复制小数据块,那么环形缓冲区似乎是一个不错的方法。但如果你不想失去事件,我认为你不能这样使用它。相反,只需获取一个环形缓冲区条目,然后复制到其中并在那里使用它。因此,唯一的原子操作应该是递增环形缓冲区位置索引。


环形缓冲区

你可以变得get_next()更有效率。此行执行原子后递增 (fetch_add) 和原子交换:

return &arena[arena_idx.exchange(arena_idx++ % arena_size)];
Run Code Online (Sandbox Code Playgroud)

我什至不确定它是否安全,因为 xchg 可能会从另一个线程踩到 fetch_add 。无论如何,即使安全,也并不理想。

你不需要那个。确保 arena_size 始终是 2 的幂,然后就不需要对共享计数器取模。您可以放开它,让每个线程对其取模以供自己使用。它最终会换行,但它是一个二进制整数,因此它将以 2 的幂换行,这是竞技场大小的倍数。

我建议存储 AND 掩码而不是大小,这样就不存在编译%为指令以外的任何内容的风险and,即使它不是编译时常量。这可以确保我们避免使用 64 位整数div指令。

template<typename T> class RingAllocator {
  T *arena;
  std::atomic_size_t arena_idx;
  const std::size_t size_mask;   // maybe even make this a template parameter?
 public:
  RingAllocator<T>(std::size_t size) 
    : arena_idx(0),  size_mask(size-1)
  {
     // verify that size is actually a power of two, so the mask is all-ones in the low bits, and all-zeros in the high bits.
     // so that i % size == i & size_mask for all i
   ...
  }

  ...
  T *get_next() {
      size_t idx = arena_idx.fetch_add(1, std::memory_order_relaxed);  // still atomic, but we don't care which order different threads take blocks in
      idx &= size_mask;   // modulo our local copy of the idx
      return &arena[idx];
  }
};
Run Code Online (Sandbox Code Playgroud)

calloc如果使用new + memset 而不是,分配 arena 会更有效。操作系统在将页面提供给用户空间进程之前已经将其清零(以防止信息泄漏),因此将它们全部写入只是浪费工作。

  arena = new T[size];
  std::memset(arena, 0, sizeof(T) * size);

  // vs.

  arena = (T*)calloc(size, sizeof(T));
Run Code Online (Sandbox Code Playgroud)

自己编写页面确实会导致它们出错,因此它们都连接到真实的物理页面,而不仅仅是系统范围共享物理零页面的写时复制映射(就像它们在 new/malloc/calloc 之后)。在 NUMA 系统上,选择的物理页面可能取决于哪个线程实际接触该页面,而不是哪个线程进行分配。但由于您要重用该池,因此第一个写入页面的核心可能不是最终使用它最多的核心。

也许需要在微基准/性能计数器中寻找一些东西。