使用互斥锁和条件变量进行线程同步

avo*_*ado 7 multithreading mutex condition-variable c++11

我正在尝试实现多线程作业,生产者和消费者,基本上我想要做的是,当消费者完成数据时,它会通知生产者,以便生产者提供新数据.

棘手的部分是,在我目前的impl中,生产者和消费者都互相通知并相互等待,我不知道如何正确地实现这一部分.

例如,请参阅下面的代码,

mutex m;
condition_variable cv;

vector<int> Q;  // this is the queue the consumer will consume
vector<int> Q_buf;  // this is a buffer Q into which producer will fill new data directly

// consumer
void consume() {
  while (1) {
    if (Q.size() == 0) {  // when consumer finishes data
      unique_lock<mutex> lk(m);
      // how to notify producer to fill up the Q?
      ...
      cv.wait(lk);
    }

    // for-loop to process the elems in Q
    ...
  }
}

// producer
void produce() {
  while (1) {
    // for-loop to fill up Q_buf
    ...

    // once Q_buf is fully filled, wait until consumer asks to give it a full Q
    unique_lock<mutex> lk(m);
    cv.wait(lk);
    Q.swap(Q_buf);  // replace the empty Q with the full Q_buf
    cv.notify_one();
  }
}
Run Code Online (Sandbox Code Playgroud)

我不确定上面的代码是否正确使用mutex并且condition_variable是实现我的想法的正确方法,请给我一些建议!

Max*_*kin 9

代码错误地假设vector<int>::size()并且vector<int>::swap()是原子的.他们不是.

此外,虚假唤醒必须由while循环(或另一个cv::wait重载)处理.

修正:

mutex m;
condition_variable cv;
vector<int> Q;

// consumer
void consume() {
    while(1) {
        // Get the new elements.
        vector<int> new_elements;
        {
            unique_lock<mutex> lk(m);
            while(Q.empty())
                cv.wait(lk);
            new_elements.swap(Q);
        }
        // for-loop to process the elems in new_elements
    }
}

// producer
void produce() {
    while(1) {
        vector<int> new_elements;
        // for-loop to fill up new_elements

        // publish new_elements
        {
            unique_lock<mutex> lk(m);
            Q.insert(Q.end(), new_elements.begin(), new_elements.end());
            cv.notify_one();
        }
    }
}
Run Code Online (Sandbox Code Playgroud)