C++ 11线程安全队列

Mat*_*ine 62 c++ queue multithreading condition-variable c++11

我正在研究的项目使用多个线程来处理文件集合.每个线程都可以将文件添加到要处理的文件列表中,因此我将(我认为是)一个线程安全的队列放在一起.相关部分如下:

// qMutex is a std::mutex intended to guard the queue
// populatedNotifier is a std::condition_variable intended to
//                   notify waiting threads of a new item in the queue

void FileQueue::enqueue(std::string&& filename)
{
    std::lock_guard<std::mutex> lock(qMutex);
    q.push(std::move(filename));

    // Notify anyone waiting for additional files that more have arrived
    populatedNotifier.notify_one();
}

std::string FileQueue::dequeue(const std::chrono::milliseconds& timeout)
{
    std::unique_lock<std::mutex> lock(qMutex);
    if (q.empty()) {
        if (populatedNotifier.wait_for(lock, timeout) == std::cv_status::no_timeout) {
            std::string ret = q.front();
            q.pop();
            return ret;
        }
        else {
            return std::string();
        }
    }
    else {
        std::string ret = q.front();
        q.pop();
        return ret;
    }
}
Run Code Online (Sandbox Code Playgroud)

但是,我偶尔会在if (...wait_for(lock, timeout) == std::cv_status::no_timeout) { }块内部进行segfaulting ,并且gdb中的检查表明由于队列为空而发生了段错误.这怎么可能?我的理解是,wait_for只有cv_status::no_timeout在收到通知后才会返回,而这应该只在FileQueue::enqueue将新项目推送到队列之后才会发生.

Che*_*ent 51

只要看一下它,当你检查一个条件变量时,最好使用一个while循环(这样如果它被唤醒并且仍然无效,你再次检查).我刚刚为异步队列编写了一个模板,希望这会有所帮助.

#ifndef SAFE_QUEUE
#define SAFE_QUEUE

#include <queue>
#include <mutex>
#include <condition_variable>

// A threadsafe-queue.
template <class T>
class SafeQueue
{
public:
  SafeQueue(void)
    : q()
    , m()
    , c()
  {}

  ~SafeQueue(void)
  {}

  // Add an element to the queue.
  void enqueue(T t)
  {
    std::lock_guard<std::mutex> lock(m);
    q.push(t);
    c.notify_one();
  }

  // Get the "front"-element.
  // If the queue is empty, wait till a element is avaiable.
  T dequeue(void)
  {
    std::unique_lock<std::mutex> lock(m);
    while(q.empty())
    {
      // release lock as long as the wait and reaquire it afterwards.
      c.wait(lock);
    }
    T val = q.front();
    q.pop();
    return val;
  }

private:
  std::queue<T> q;
  mutable std::mutex m;
  std::condition_variable c;
};
#endif
Run Code Online (Sandbox Code Playgroud)

  • @Ajay 为了那些将来来这里的人(像我一样)阅读你的评论,不,事实并非如此,因为 [`condition_variable::wait` 释放锁并且仅在唤醒时重新获取](https://en .cppreference.com/w/cpp/thread/condition_variable/wait) (4认同)
  • 仅供参考:while(q.empty())`循环等效于:`c.wait(lock,[&] {return!q.empty();});` (3认同)
  • 谢谢!幸运的是,我已经使用 [here](http://en.cppreference.com/w/cpp/thread/condition_variable/wait_for) 描述的谓词解决了这个问题。 (2认同)
  • 恕我直言,最简单,最优雅的解决方案。 (2认同)

Gri*_*zly 30

根据标准condition_variables允许虚假唤醒,即使事件没有发生.在虚假唤醒的情况下,它将返回cv_status::no_timeout(因为它醒来而不是超时),即使它没有被通知.对此的正确解决方案当然是在程序之前检查唤醒是否真的合法.

详细信息在标准§30.5.1[thread.condition.condvar]中指定:

- 当通过调用notify_one(),调用notify_all(),abs_time指定的绝对超时(30.2.4)到期或伪造时发出信号时,该函数将解除阻塞.

...

返回: cv_status :: timeout如果绝对超时(30.2.4)指定为abs_time到期,则为other-ise cv_status :: no_timeout.

  • 如果使用lambda作为wait()调用的可选参数,它将对您进行检查并防止虚假唤醒产生任何影响. (5认同)
  • 作为历史记录,这很可能是为了支持低级Unix系统调用的行为.如果你发现从客户设计的角度来看有点不能令人满意,那么理查德·加布里尔在1989年也是如此.他对这一主题的思考成了一篇非常有名的软件设计论文,["更糟糕的是更好"的崛起](https:/ /www.jwz.org/doc/worse-is-better.html) (5认同)
  • 附录:我可以使用诸如[here](http://en.cppreference.com/w/cpp/thread/condition_variable/wait_for)之类的谓词来防止虚假唤醒. (4认同)
  • 是.它被称为_condition_变量,因为它与某些条件相关联,您必须检查它实际上是真的.在你的情况下,要检查的条件是`!q.empty()` (2认同)

ron*_*nag 13

这可能是你应该怎么做的:

void push(std::string&& filename)
{
    {
        std::lock_guard<std::mutex> lock(qMutex);

        q.push(std::move(filename));
    }

    populatedNotifier.notify_one();
}

bool try_pop(std::string& filename, std::chrono::milliseconds timeout)
{
    std::unique_lock<std::mutex> lock(qMutex);

    if(!populatedNotifier.wait_for(lock, timeout, [this] { return !q.empty(); }))
        return false;

    filename = std::move(q.front());
    q.pop();

    return true;    
}
Run Code Online (Sandbox Code Playgroud)


qua*_*dev 10

添加到接受的答案,我会说实现正确的多生产者/多消费者队列是困难的(从C++ 11开始,更容易)

我建议你尝试(非常好的)无升级,"队列"结构将做你想要的,无需等待/无锁保证,而不需要C++ 11编译器.

我现在正在添加这个答案,因为无锁库是一个很新的提升(因为1.53我相信)

  • 感谢您指出那个图书馆。但是,目前似乎没有队列的文档。关于在哪里可以找到的任何想法? (2认同)

Sla*_*ica 5

我会将你的出队函数重写为:

std::string FileQueue::dequeue(const std::chrono::milliseconds& timeout)
{
    std::unique_lock<std::mutex> lock(qMutex);
    while(q.empty()) {
        if (populatedNotifier.wait_for(lock, timeout) == std::cv_status::timeout ) 
           return std::string();
    }
    std::string ret = q.front();
    q.pop();
    return ret;
}
Run Code Online (Sandbox Code Playgroud)

它更短,并没有像你那样重复的代码.只发出它可能会等待更长的超时.为了防止你需要记住循环前的开始时间,检查超时并相应地调整等待时间.或者在等待条件下指定绝对时间.