单一生产者,单一消费者数据结构,在C++中具有双缓冲区

use*_*506 27 c++ concurrency real-time producer-consumer double-buffering

我在$ work处有一个应用程序,我必须在两个以不同频率调度的实时线程之间移动.(实际的调度是我无法控制的.)应用程序是硬实时的(其中一个线程必须驱动硬件接口),因此线程之间的数据传输应该是无锁的,并且无需等待尽可能.

重要的是要注意,只需要传输一个数据块:因为两个线程以不同的速率运行,所以有时会在较慢的线程的两个唤醒之间完成两个较快的线程迭代; 在这种情况下,可以覆盖写缓冲区中的数据,以便较慢的线程只获取最新的数据.

换句话说,双缓冲解决方案代替队列就足够了.这两个缓冲区在初始化期间分配,读取器和写入线程可以调用类的方法来获取指向这些缓冲区之一的指针.

C++代码:

#include <mutex>

template <typename T>
class ProducerConsumerDoubleBuffer {
public:
    ProducerConsumerDoubleBuffer() {
        m_write_busy = false;
        m_read_idx = m_write_idx = 0;
    }

    ~ProducerConsumerDoubleBuffer() { }

    // The writer thread using this class must call
    // start_writing() at the start of its iteration
    // before doing anything else to get the pointer
    // to the current write buffer.
    T * start_writing(void) {
        std::lock_guard<std::mutex> lock(m_mutex);

        m_write_busy = true;
        m_write_idx = 1 - m_read_idx;

        return &m_buf[m_write_idx];
    }
    // The writer thread must call end_writing()
    // as the last thing it does
    // to release the write busy flag.
    void end_writing(void) {
        std::lock_guard<std::mutex> lock(m_mutex);

        m_write_busy = false;
    }

    // The reader thread must call start_reading()
    // at the start of its iteration to get the pointer
    // to the current read buffer.
    // If the write thread is not active at this time,
    // the read buffer pointer will be set to the 
    // (previous) write buffer - so the reader gets the latest data.
    // If the write buffer is busy, the read pointer is not changed.
    // In this case the read buffer may contain stale data,
    // it is up to the user to deal with this case.
    T * start_reading(void) {
        std::lock_guard<std::mutex> lock(m_mutex);

        if (!m_write_busy) {
            m_read_idx = m_write_idx;
        }

        return &m_buf[m_read_idx];
    }
    // The reader thread must call end_reading()
    // at the end of its iteration.
    void end_reading(void) {
        std::lock_guard<std::mutex> lock(m_mutex);

        m_read_idx = m_write_idx;
    }

private:
    T m_buf[2];
    bool m_write_busy;
    unsigned int m_read_idx, m_write_idx;
    std::mutex m_mutex;
};
Run Code Online (Sandbox Code Playgroud)

为了避免读取器线程中的陈旧数据,有效负载结构被版本化.为了促进线程之间的双向数据传输,在相反的方向上使用上述怪物的两个实例.

问题:

  • 这个方案线程安全吗?如果它坏了,在哪里?
  • 可以在没有互斥锁的情况下完成吗?也许仅仅是内存障碍或CAS指令?
  • 可以做得更好吗?

Cam*_*ron 11

非常有趣的问题!比我想象的更棘手:-)我喜欢无锁解决方案,所以我试着在下面工作一个.

有很多方法可以考虑这个系统.您可以将其建模为固定大小的循环缓冲区/队列(具有两个条目),但是您将失去更新下一个可用消费值的能力,因为您不知道消费者是否已开始阅读最近公布的价值或仍然(可能)阅读前一个.因此,除了标准环形缓冲区之外,还需要额外的状态,以便达到更优的解决方案.

首先请注意,生产者在任何给定的时间点都可以安全地写入一个单元格; 如果消费者正在读取一个单元格,则可以写入另一个单元格.让我们调用可以安全地写入"活动"单元格的单元格(可以读取的单元格任何单元格不是 活动单元格).如果当前未读取其他单元,则只能切换活动单元.

与可以始终写入的活动单元不同,非活动单元只能在包含值的情况下读取; 一旦消耗了这个价值,它就消失了.(这意味着在积极的生产者的情况下避免了活锁;在某些时候,消费者将清空一个细胞并且将停止接触细胞.一旦发生这种情况,生产者肯定可以发布一个值,而在此之前,如果消费者不在阅读过程中,它只能发布一个值(更改活动单元格).)

如果一个值,该值已经准备好被消耗掉,只有消费者可以改变这一事实(对于非活跃细胞,反正); 后续制作可能会更改哪个单元格处于活动状态以及已发布的值,但是在消耗之前,值始终可以读取.

一旦生产者完成写入活动单元格,它就可以通过改变哪个单元格是活动单元(交换索引)来"发布"该值,前提是消费者不在读取其他单元格的中间.如果消费者在阅读其他小区的中间,不能发生交换,但在这种情况下,后消费者可以交换读完的价值,所提供的生产商是不是在写的中间(如果是,生产者一旦完成就会交换).事实上,一般来说,消费者在读完之后总是可以交换(如果它是唯一访问系统的人),因为消费者的虚假交换是良性的:如果其他单元格中有某些东西,那么交换会导致读取接下来,如果没有,交换不会影响任何东西.

因此,我们需要一个共享变量来跟踪活动单元格是什么,我们还需要一种方法让生产者和消费者指出它们是否处于操作的中间.我们可以将这三个状态存储到一个原子变量中,以便能够一次性(原子地)影响它们.我们还需要一种方法让消费者首先检查非活动单元格中是否有任何内容,并且两个线程都可以根据需要修改该状态.我尝试了其他几种方法,但最后最简单的方法就是将这些信息包含在另一个原子变量中.这使得事情变得更加简单,因为系统中的所有状态变化都是原子的.

我想出了一个无等待的实现(无锁,所有操作都在有限的指令中完成).

代码时间!

#include <atomic>
#include <cstdint>

template <typename T>
class ProducerConsumerDoubleBuffer {
public:
    ProducerConsumerDoubleBuffer() : m_state(0) { }
    ~ProducerConsumerDoubleBuffer() { }

    // Never returns nullptr
    T* start_writing() {
        // Increment active users; once we do this, no one
        // can swap the active cell on us until we're done
        auto state = m_state.fetch_add(0x2, std::memory_order_relaxed);
        return &m_buf[state & 1];
    }

    void end_writing() {
        // We want to swap the active cell, but only if we were the last
        // ones concurrently accessing the data (otherwise the consumer
        // will do it for us when *it's* done accessing the data)

        auto state = m_state.load(std::memory_order_relaxed);
        std::uint32_t flag = (8 << (state & 1)) ^ (state & (8 << (state & 1)));
        state = m_state.fetch_add(flag - 0x2, std::memory_order_release) + flag - 0x2;
        if ((state & 0x6) == 0) {
            // The consumer wasn't in the middle of a read, we should
            // swap (unless the consumer has since started a read or
            // already swapped or read a value and is about to swap).
            // If we swap, we also want to clear the full flag on what
            // will become the active cell, otherwise the consumer could
            // eventually read two values out of order (it reads a new
            // value, then swaps and reads the old value while the
            // producer is idle).
            m_state.compare_exchange_strong(state, (state ^ 0x1) & ~(0x10 >> (state & 1)), std::memory_order_release);
        }
    }

    // Returns nullptr if there appears to be no more data to read yet
    T* start_reading() {
        m_readState = m_state.load(std::memory_order_relaxed);
        if ((m_readState & (0x10 >> (m_readState & 1))) == 0) {
            // Nothing to read here!
            return nullptr;
        }

        // At this point, there is guaranteed to be something to
        // read, because the full flag is never turned off by the
        // producer thread once it's on; the only thing that could
        // happen is that the active cell changes, but that can
        // only happen after the producer wrote a value into it,
        // in which case there's still a value to read, just in a
        // different cell.

        m_readState = m_state.fetch_add(0x2, std::memory_order_acquire) + 0x2;

        // Now that we've incremented the user count, nobody can swap until
        // we decrement it
        return &m_buf[(m_readState & 1) ^ 1];
    }

    void end_reading() {
        if ((m_readState & (0x10 >> (m_readState & 1))) == 0) {
            // There was nothing to read; shame to repeat this
            // check, but if these functions are inlined it might
            // not matter. Otherwise the API could be changed.
            // Or just don't call this method if start_reading()
            // returns nullptr -- then you could also get rid
            // of m_readState.
            return;
        }

        // Alright, at this point the active cell cannot change on
        // us, but the active cell's flag could change and the user
        // count could change. We want to release our user count
        // and remove the flag on the value we read.

        auto state = m_state.load(std::memory_order_relaxed);
        std::uint32_t sub = (0x10 >> (state & 1)) | 0x2;
        state = m_state.fetch_sub(sub, std::memory_order_relaxed) - sub;
        if ((state & 0x6) == 0 && (state & (0x8 << (state & 1))) == 1) {
            // Oi, we were the last ones accessing the data when we released our cell.
            // That means we should swap, but only if the producer isn't in the middle
            // of producing something, and hasn't already swapped, and hasn't already
            // set the flag we just reset (which would mean they swapped an even number
            // of times).  Note that we don't bother swapping if there's nothing to read
            // in the other cell.
            m_state.compare_exchange_strong(state, state ^ 0x1, std::memory_order_relaxed);
        }
    }

private:
    T m_buf[2];

    // The bottom (lowest) bit will be the active cell (the one for writing).
    // The active cell can only be switched if there's at most one concurrent
    // user. The next two bits of state will be the number of concurrent users.
    // The fourth bit indicates if there's a value available for reading
    // in m_buf[0], and the fifth bit has the same meaning but for m_buf[1].
    std::atomic<std::uint32_t> m_state;

    std::uint32_t m_readState;
};
Run Code Online (Sandbox Code Playgroud)

请注意,语义是消费者永远不能读取给定值两次,并且它读取的值总是比它读取的最后一个值更新.它在内存使用方面也相当高效(两个缓冲区,就像你的原始解决方案一样).我避免使用CAS循环,因为它们通常比竞争中的单个原子操作效率低.

如果您决定使用上面的代码,我建议您首先为它编写一些全面的(线程)单元测试.适当的基准.我做了测试,但只是勉强.如果您发现任何错误,请告诉我们:-)

我的单元测试:

ProducerConsumerDoubleBuffer<int> buf;
std::thread producer([&]() {
    for (int i = 0; i != 500000; ++i) {
        int* item = buf.start_writing();
        if (item != nullptr) {      // Always true
            *item = i;
        }
        buf.end_writing();
    }
});
std::thread consumer([&]() {
    int prev = -1;
    for (int i = 0; i != 500000; ++i) {
        int* item = buf.start_reading();
        if (item != nullptr) {
            assert(*item > prev);
            prev = *item;
        }
        buf.end_reading();
    }
});
producer.join();
consumer.join();
Run Code Online (Sandbox Code Playgroud)

至于你原来的实现,我只是轻率地看着它(设计新东西更有趣,呵呵),但是david.pfx的答案似乎解决了你问题的那一部分.