C++ 11 lockfree单一生产者单一消费者:如何避免忙等待

mbr*_*brt 9 c++ multithreading lock-free c++11

我正在尝试实现一个使用两个线程的类:一个用于生产者,一个用于消费者.当前实现不使用锁:

#include <boost/lockfree/spsc_queue.hpp>
#include <atomic>
#include <thread>

using Queue =
        boost::lockfree::spsc_queue<
            int,
            boost::lockfree::capacity<1024>>;

class Worker
{
public:
    Worker() : working_(false), done_(false) {}
    ~Worker() {
        done_ = true;    // exit even if the work has not been completed
        worker_.join();
    }

    void enqueue(int value) {
        queue_.push(value);
        if (!working_) {
            working_ = true;
            worker_ = std::thread([this]{ work(); });
        }
    }

    void work() {
        int value;
        while (!done_ && queue_.pop(value)) {
            std::cout << value << std::endl;
        }
        working_ = false;
    }

private:
    std::atomic<bool> working_;
    std::atomic<bool> done_;
    Queue queue_;
    std::thread worker_;
};
Run Code Online (Sandbox Code Playgroud)

应用程序需要将工作项排入一定时间,然后等待事件休眠.这是模拟行为的最小主要:

int main()
{
    Worker w;
    for (int i = 0; i < 1000; ++i)
        w.enqueue(i);
    std::this_thread::sleep_for(std::chrono::seconds(1));
    for (int i = 0; i < 1000; ++i)
        w.enqueue(i);
    std::this_thread::sleep_for(std::chrono::seconds(1));
}
Run Code Online (Sandbox Code Playgroud)

我很确定我的实现有问题:如果工作线程完成并且在执行之前working_ = false,另一个enqueue是什么呢?是否可以在不使用锁的情况下使我的代码线程安全?

解决方案要求:

  • 快速入队
  • 即使队列不为空,析构函数也必须退出
  • 没有忙等待,因为工作线程闲置很长一段时间
  • 如果可能的话没有锁

编辑

Worker根据你的建议做了另一个类的实现.这是我的第二次尝试:

class Worker
{
public:
    Worker()
        : working_(ATOMIC_FLAG_INIT), done_(false) { } 

    ~Worker() {
        // exit even if the work has not been completed
        done_ = true;
        if (worker_.joinable())
            worker_.join();
    }

    bool enqueue(int value) {
        bool enqueued = queue_.push(value);
        if (!working_.test_and_set()) {
            if (worker_.joinable())
                worker_.join();
            worker_ = std::thread([this]{ work(); });
        }
        return enqueued;
    }

    void work() {
        int value;
        while (!done_ && queue_.pop(value)) {
            std::cout << value << std::endl;
        }
        working_.clear();
        while (!done_ && queue_.pop(value)) {
            std::cout << value << std::endl;
        }
    }

private:
    std::atomic_flag working_;
    std::atomic<bool> done_;
    Queue queue_;
    std::thread worker_;
};
Run Code Online (Sandbox Code Playgroud)

我介绍worker_.join()enqueue方法内部.这可能会影响性能,但在非常罕见的情况下(当队列变空并且在线程退出之前,另一个enqueue出现).该working_变量现在是一个atomic_flag设置enqueue和清除的变量work.需要Additional whileafter working_.clear(),因为如果推送了另一个值,则在之前clear,但在之后while,不处理该值.

这个实现是否正确?

我做了一些测试,实现似乎工作.

OT:将它作为编辑或答案更好吗?

mbr*_*brt 0

这是我对问题的解答。我不太喜欢回答自己,但我认为展示实际代码可能会对其他人有所帮助。

#include <boost/lockfree/spsc_queue.hpp>
#include <atomic>
#include <thread>
// I used this semaphore class: https://gist.github.com/yohhoy/2156481
#include "binsem.hpp"

using Queue =
    boost::lockfree::spsc_queue<
        int,
        boost::lockfree::capacity<1024>>;

class Worker
{
public:
    // the worker thread starts in the constructor
    Worker()
        : working_(ATOMIC_FLAG_INIT), done_(false), semaphore_(0)
        , worker_([this]{ work(); })
    { } 

    ~Worker() {
        // exit even if the work has not been completed
        done_ = true;
        semaphore_.signal();
        worker_.join();
    }

    bool enqueue(int value) {
        bool enqueued = queue_.push(value);
        if (!working_.test_and_set())
            // signal to the worker thread to wake up
            semaphore_.signal();
        return enqueued;
    }

    void work() {
        int value;
        // the worker thread continue to live
        while (!done_) {
            // wait the start signal, sleeping
            semaphore_.wait();
            while (!done_ && queue_.pop(value)) {
                // perform actual work
                std::cout << value << std::endl;
            }
            working_.clear();
            while (!done_ && queue_.pop(value)) {
                // perform actual work
                std::cout << value << std::endl;
            }
        }
    }

private:
    std::atomic_flag working_;
    std::atomic<bool> done_;
    binsem semaphore_;
    Queue queue_;
    std::thread worker_;
};
Run Code Online (Sandbox Code Playgroud)

我尝试了@Cameron 的建议,不要关闭线程并添加信号量。enqueue这实际上仅在第一个和最后一个中使用work。这不是无锁的,但仅限于这两种情况。

我在之前的版本(请参阅我编辑的问题)和这个版本之间进行了一些性能比较。当开始和停止次数不多时,没有显着差异。然而,enqueue当它必须使用signal工作线程而不是启动一个新线程时,速度要快 10 倍。这是一个罕见的情况,所以不是很重要,但无论如何它是一个改进。

该实现满足:

  • 一般情况下是无锁的(当enqueuework很忙时);
  • 无需忙等待,以防长时间没有enqueue
  • 析构函数尽快退出
  • 正确性??:)