C++相当于Java的BlockingQueue

Ben*_*Ben 30 c++ multithreading

我正在将一些Java代码移植到C++,并且一个特定部分使用BlockingQueue将消息从许多生产者传递给单个消费者.

如果您不熟悉Java BlockingQueue是什么,它只是一个具有硬容量的队列,它将线程安全方法暴露给队列中的put()和take().如果队列已满,则put()阻塞;如果队列为空,则使用take()块.此外,还提供了这些方法的超时敏感版本.

超时与我的用例相关,因此提供这些超时的建议是理想的.如果没有,我可以自己编写代码.

我已经google了一下,并迅速浏览了Boost库,我找不到这样的东西.也许我在这里失明了......但有人知道一个好推荐吗?

谢谢!

Die*_*ühl 46

它不是固定大小,它不支持超时,但这是我最近使用C++ 2011构造发布的队列的简单实现:

#include <mutex>
#include <condition_variable>
#include <deque>

template <typename T>
class queue
{
private:
    std::mutex              d_mutex;
    std::condition_variable d_condition;
    std::deque<T>           d_queue;
public:
    void push(T const& value) {
        {
            std::unique_lock<std::mutex> lock(this->d_mutex);
            d_queue.push_front(value);
        }
        this->d_condition.notify_one();
    }
    T pop() {
        std::unique_lock<std::mutex> lock(this->d_mutex);
        this->d_condition.wait(lock, [=]{ return !this->d_queue.empty(); });
        T rc(std::move(this->d_queue.back()));
        this->d_queue.pop_back();
        return rc;
    }
};
Run Code Online (Sandbox Code Playgroud)

扩展和使用定时等待弹出应该是微不足道的.我没有做到的主要原因是我对目前为止我想到的界面选择不满意.

  • `push()`中的作用域不是必需的,但如果没有它,则在保持锁定时发出条件变量的信号.在发出信号之前释放锁定使锁定随时可用. (8认同)
  • @Nitzanu:是的,您错过了条件变量的定义特征是它释放锁并使线程进入睡眠状态而不丢失信号。我特别指出锁将由“wait()”释放。这就是为什么使用“unique_lock”而不是“lock_guard”的原因,因为后者没有在不破坏的情况下释放锁的接口。当 wait() 返回时,它将重新获取锁。重要的是要认识到锁在“wait()”期间被释放,因为锁保护的其他内容也可能发生变化。 (3认同)

Ser*_*tch 6

这是具有关闭请求功能的阻塞队列的示例:

template <typename T> class BlockingQueue {
  std::condition_variable _cvCanPop;
  std::mutex _sync;
  std::queue<T> _qu;
  bool _bShutdown = false;

public:
  void Push(const T& item)
  {
    {
      std::unique_lock<std::mutex> lock(_sync);
      _qu.push(item);
    }
    _cvCanPop.notify_one();
  }

  void RequestShutdown() {
    {
      std::unique_lock<std::mutex> lock(_sync);
      _bShutdown = true;
    }
    _cvCanPop.notify_all();
  }

  bool Pop(T &item) {
    std::unique_lock<std::mutex> lock(_sync);
    for (;;) {
      if (_qu.empty()) {
        if (_bShutdown) {
          return false;
        }
      }
      else {
        break;
      }
      _cvCanPop.wait(lock);
    }
    item = std::move(_qu.front());
    _qu.pop();
    return true;
  }
};
Run Code Online (Sandbox Code Playgroud)