此无锁 SPSC 环形缓冲区队列中每个原子的内存顺序是否正确?

Den*_*199 9 c++ multithreading lock-free memory-barriers stdatomic

我有一个环形缓冲区,如下所示:

template<class T>
class RingBuffer {
public:
  bool Publish();
 
  bool Consume(T& value);

  bool IsEmpty(std::size_t head, std::size_t tail);

  bool IsFull(std::size_t head, std::size_t tail);

private:
  std::size_t Next(std::size_t slot);

  std::vector<T> buffer_;
  std::atomic<std::size_t> tail_{0};
  std::atomic<std::size_t> head_{0};
  static constexpr std::size_t kBufferSize{8};
};
Run Code Online (Sandbox Code Playgroud)

该数据结构旨在与两个线程一起使用:发布者线程和消费者线程。下面列出了两个不将内存顺序传递给原子的主要函数:

bool Publish(T value) {
    const size_t curr_head = head_.load(/* memory order */);
    const size_t curr_tail = tail_.load(/* memory_order */);

    if (IsFull(curr_head, curr_tail)) {
      return false;
    }

    buffer_[curr_tail] = std::move(value);
    tail_.store(Next(curr_tail) /*, memory order */);
    return true;
  }

bool Consume(T& value) {
    const size_t curr_head = head_.load(/* memory order */);
    const size_t curr_tail = tail_.load(/* memory order */);

    if (IsEmpty(curr_head, curr_tail)) {
      return false;
    }

    value = std::move(buffer_[curr_head]);
    head_.store(Next(curr_head) /*, memory order */);
    return true;
  }
Run Code Online (Sandbox Code Playgroud)

我知道,至少,我必须tail_.store()Publish()函数 withstd::memory_order::releasetail_.load()with中创建函数,std::memory_order::acquire然后才能在 write to和 read之间进行连接。另外,我可以传递到in和 to in ,因为同一个线程将看到对原子的最后一次写入。现在函数是这样的:Consume()buffer_buffer_std::memory_order::relaxedtail_.load()Publish()head_.load()Consume()

 bool Publish(T value) {
     const size_t curr_head = head_.load(/* memory order */);
     const size_t curr_tail = tail_.load(std::memory_order::relaxed);
    
     if (IsFull(curr_head, curr_tail)) {
         return false;
     }
    
     buffer_[curr_tail] = std::move(value);
     tail_.store(Next(curr_tail), std::memory_order::release);
     return true;
 }

 bool Consume(T& value) {
     const size_t curr_head = head_.load(std::memory_order::relaxed);
     const size_t curr_tail = tail_.load(std::memory_order::acquire);
    
     if (IsEmpty(curr_head, curr_tail)) {
         return false;
     }
    
     value = std::move(buffer_[curr_head]);
     head_.store(Next(curr_head) /*, memory order */);
     return true;
 }
Run Code Online (Sandbox Code Playgroud)

最后一步是将内存顺序放入剩余的对中:head_.load()inPublish()head_.store()in Consume()。我必须在 invalue = std::move(buffer_[curr_head]);之前执行行,否则在缓冲区已满的情况下我会发生数据竞争,因此,至少我必须传递到该存储操作以避免重新排序。但我必须将其放入函数中吗?我知道这将有助于看到与不同的合理时间,但如果我不需要这种较短时间的保证来看到存储操作的副作用,我可以有一个宽松的内存顺序吗?如果我不能,那为什么?完成的代码:head_.store()Consume()std::memory_order::releasestd::memory_order::acquirehead_.load()Publish()head_.load()head_.store()std::memory_order::relaxed

bool Publish(T value) {
    const size_t curr_head = head_.load(std::memory_order::relaxed); // or acquire?
    const size_t curr_tail = tail_.load(std::memory_order::relaxed);
        
    if (IsFull(curr_head, curr_tail)) {
        return false;
    }
        
    buffer_[curr_tail] = std::move(value);
    tail_.store(Next(curr_tail), std::memory_order::release);
    return true;
}
    
bool Consume(T& value) {
    const size_t curr_head = head_.load(std::memory_order::relaxed);
    const size_t curr_tail = tail_.load(std::memory_order::acquire);
        
    if (IsEmpty(curr_head, curr_tail)) {
        return false;
    }
        
    value = std::move(buffer_[curr_head]);
    head_.store(Next(curr_head), std::memory_order::release);
    return true;
}
Run Code Online (Sandbox Code Playgroud)

每个原子的内存顺序是否正确?我对每个原子变量中每个内存顺序的使用的解释是否正确?

Tig*_*ire 6

以前的答案可能有助于作为背景:
c++、std::atomic、什么是 std::memory_order 以及如何使用它们?
https://bartoszmilewski.com/2008/12/01/c-atomics-and-memory-ordering/

首先,您描述的系统被称为单一生产者-单一消费者队列。您始终可以查看此容器的 boost 版本进行比较。我经常会检查 boost 代码,即使我在不​​允许 boost 的情况下工作。这是因为检查和理解稳定的解决方案将使您深入了解可能遇到的问题(他们为什么这样做?哦,我明白了 - 等等)。鉴于您的设计,并且已经编写了许多类似的容器,我会说您的设计必须小心区分空和满。如果您使用经典的 {begin,end} 对,您会遇到由于包装而导致的问题

{开始,开始+大小} == {开始,开始} == 空

好吧,回到同步问题。

鉴于该命令仅影响重新排序,因此在“发布”中使用“发布”似乎是对该标志的教科书使用。在容器的大小增加之前,没有任何东西会读取该值,因此您不关心该值本身的写入顺序是否以随机顺序发生,您只关心该值必须在计数增加之前完全写入。所以我同意,您在发布功能中正确使用了该标志。
我确实质疑消费中是否需要“释放”,但如果您要移出队列,并且这些移动会产生副作用,则可能需要它。我想说,如果您追求原始速度,那么可能值得制作第二个版本,该版本专门用于琐碎的对象,使用宽松的顺序来增加头部。

您还可以在推送/弹出时考虑就地新建/删除。虽然大多数移动都会使对象处于空状态,但标准仅要求在移动后使其保持有效状态。移动后显式删除对象可能会避免您以后出现隐蔽的错误。

您可能会争辩说,consume 中的两个原子负载可能是memory_order_consume。这放宽了约束,表示“我不在乎它们的加载顺序,只要它们在使用时都已加载”。尽管我怀疑它在实践中会产生任何收益。我也对这个建议感到紧张,因为当我查看增强版本时,它与您所拥有的非常接近。 https://www.boost.org/doc/libs/1_66_0/boost/lockfree/spsc_queue.hpp

 template <typename Functor>
    bool consume_one(Functor & functor, T * buffer, size_t max_size)
    {
        const size_t write_index = write_index_.load(memory_order_acquire);
        const size_t read_index  = read_index_.load(memory_order_relaxed);
        if ( empty(write_index, read_index) )
            return false;

        T & object_to_consume = buffer[read_index];
        functor( object_to_consume );
        object_to_consume.~T();

        size_t next = next_index(read_index, max_size);
        read_index_.store(next, memory_order_release);
        return true;
    }
Run Code Online (Sandbox Code Playgroud)

总的来说,虽然你的方法看起来不错并且与增强版本非常相似。但是...如果我是你,我可能会逐行浏览 boost 版本,看看它有什么不同。这是很容易犯错误的。

编辑:抱歉,我刚刚注意到您的代码中的 memory_order_acquire/memory_order_relaxed 标记的顺序是错误的。您应该将最后一个写为“发布”,将第一个写为“获取”。这不会显着影响行为,但使其更易于阅读。(开始时同步,结束时同步)

回复评论:正如@user17732522暗示的那样,复制操作也是写入,所以对琐碎对象的优化不会同步,并且对琐碎对象的优化会引入U/B(妈的,很容易出错)