使用std :: mutex,std :: condition_variable和std :: unique_lock

fis*_*ood 12 c++ concurrency multithreading mutex condition-variable

我在理解条件变量及其与互斥体的使用方面遇到了一些麻烦,我希望社区可以帮助我.请注意,我来自win32背景,所以我使用CRITICAL_SECTION,HANDLE,SetEvent,WaitForMultipleObject等.

这是我第一次使用c ++ 11标准库进行并发尝试,它是此处程序示例的修改版本.

#include <condition_variable>
#include <mutex>
#include <algorithm>
#include <thread>
#include <queue>
#include <chrono>
#include <iostream>


int _tmain(int argc, _TCHAR* argv[])
{   
    std::queue<unsigned int>    nNumbers;

    std::mutex                  mtxQueue;
    std::condition_variable     cvQueue;
    bool                        m_bQueueLocked = false;

    std::mutex                  mtxQuit;
    std::condition_variable     cvQuit;
    bool                        m_bQuit = false;


    std::thread thrQuit(
        [&]()
        {
            using namespace std;            

            this_thread::sleep_for(chrono::seconds(7));

            // set event by setting the bool variable to true
            // then notifying via the condition variable
            m_bQuit = true;
            cvQuit.notify_all();
        }
    );

    std::thread thrProducer(
        [&]()
        {           
            using namespace std;

            int nNum = 0;
            unique_lock<mutex> lock( mtxQuit );

            while( ( ! m_bQuit ) && 
                   ( cvQuit.wait_for( lock, chrono::milliseconds(10) ) == cv_status::timeout ) )
            {
                nNum ++;

                unique_lock<mutex> qLock(mtxQueue);
                cout << "Produced: " << nNum << "\n";
                nNumbers.push( nNum );              
            }
        }
    );

    std::thread thrConsumer(
        [&]()
        {
            using namespace std;            

            unique_lock<mutex> lock( mtxQuit );

            while( ( ! m_bQuit ) && 
                    ( cvQuit.wait_for( lock, chrono::milliseconds(10) ) == cv_status::timeout ) )
            {
                unique_lock<mutex> qLock(mtxQueue);
                if( nNumbers.size() > 0 )
                {
                    cout << "Consumed: " << nNumbers.front() << "\n";
                    nNumbers.pop();
                }               
            }
        }
    );

    thrQuit.join();
    thrProducer.join();
    thrConsumer.join();

    return 0;
}
Run Code Online (Sandbox Code Playgroud)

关于这一点的几个问题.

我已经读过 "任何打算在std :: condition_variable上等待的线程必须先获取std :: unique_lock."

所以我有一个{quit mutex,condition variable&bool}来指示退出时的信号.生产者和消费者线程必须各自获取std :: unique_lock,如下所示:

std::unique_lock<std::mutex> lock(m_mtxQuit);
Run Code Online (Sandbox Code Playgroud)

这让我感到困惑.这不会锁定第一个线程中的退出互斥锁,从而阻塞第二个线程吗?如果这是真的,那么第一个线程如何释放锁,以便另一个线程可以开始?

另一个问题:如果我将wait_for()调用更改为等待零秒,则该线程将被饿死.谁能解释一下?我希望它在执行while循环之前不会阻塞(我是否正确假设no_timeout被重新获取而不是超时?).

如何调用wait_for()并指定零时间,以便wait_for()调用不会阻塞,而只是检查条件并继续?

我也有兴趣听到关于这个主题的好的参考资料.

Jon*_*ely 12

这不会锁定第一个线程中的退出互斥锁,从而阻塞第二个线程吗?

是.

如果这是真的,那么第一个线程如何释放锁,以便另一个线程可以开始?

当你等待condition_variable它时解锁你传递它的锁,所以

cvQuit.wait_for( lock, chrono::milliseconds(10) )
Run Code Online (Sandbox Code Playgroud)

条件变量将调用lock.unlock()然后阻塞长达10毫秒(这是原子地发生的,因此在解锁互斥锁和阻止条件可以准备就绪之前没有窗口,你会错过它)

当互斥锁解锁时,它允许另一个线程获取锁定.

另一个问题:如果我将wait_for()调用更改为等待零秒,则该线程将被饿死.谁能解释一下?

我希望另一个线程被饿死,因为互斥锁没有解锁足够长的时间让另一个线程锁定它.

我是否正确假设no_timeout被收回而不是超时?

不,如果持续时间过去而条件没有准备就绪,那么即使在零秒之后它也会"超时".

如何调用wait_for()并指定零时间,以便wait_for()调用不会阻塞,而只是检查条件并继续?

不要使用条件变量!如果您不想等待条件变为真,请不要等待条件变量!只是测试m_bQuit并继续.(除此之外,为什么你的布尔值被称为m_bXxx?他们不是成员,所以m_前缀是误导性的,b前缀看起来像匈牙利表示法的那种糟糕的MS习惯......这很糟糕.)

我也有兴趣听到关于这个主题的好的参考资料.

最好的参考是Anthony Williams的C++ Concurrency In Action,它详细介绍了整个C++ 11原子和线程库,以及多线程编程的一般原则.我最喜欢的一本关于这个主题的书是Butenhof的POSIX Threads编程,它专门针对Pthreads,但是C++ 11工具与Pthreads非常接近,因此很容易将该书的信息传递给C++ 11多线程.

注意:在thrQuitm_bQuit没有使用互斥锁保护它的情况下写入,因为没有什么能阻止另一个线程在写入的同时读取它,这是一种竞争条件,即未定义的行为.对bool的写入必须由互斥锁保护,或者必须是原子类型,例如std::atomic<bool>

我认为你不需要两个互斥锁,它只是增加了争用.因为mtxQuit在等待时你永远不会释放except,condition_variable所以没有第二个互斥锁,这个mtxQuit已经确保只有一个线程可以一次进入临界区.