C++ - 多线程 - 线程之间的通信

cmp*_*x96 5 c++ multithreading condition-variable

#include <iostream>
#include <thread>
#include <condition_variable>
#include <queue>
#include <cstdlib>
#include <chrono>
#include <ctime>
#include <random>

using namespace std;

//counts every number that is added to the queue
static long long producer_count = 0;
//counts every number that is taken out of the queue
static long long consumer_count = 0;

void generateNumbers(queue<int> & numbers, condition_variable & cv, mutex & m, bool & workdone){
    while(!workdone) {
        unique_lock<std::mutex> lk(m);
        int rndNum = rand() % 100;
        numbers.push(rndNum);
        producer_count++;
        cv.notify_one();
     }
}

void work(queue<int> & numbers, condition_variable & cv, mutex & m, bool & workdone) {
    while(!workdone) {
        unique_lock<std::mutex> lk(m);
        cv.wait(lk);
        cout << numbers.front() << endl;
        numbers.pop();
        consumer_count++;

     }
}

int main() {
    condition_variable cv;
    mutex m;
    bool workdone = false;
    queue<int> numbers;

    //start threads
    thread producer(generateNumbers, ref(numbers), ref(cv), ref(m),     ref(workdone));
    thread consumer(work, ref(numbers), ref(cv), ref(m), ref(workdone));

    //wait for 3 seconds, then join the threads
    this_thread::sleep_for(std::chrono::seconds(3));
    workdone = true;

    producer.join();
    consumer.join();

    //output the counters
    cout << producer_count << endl;
    cout << consumer_count << endl;

    return 0;
}
Run Code Online (Sandbox Code Playgroud)

大家好,我尝试用C++实现生产者-消费者-模式。生产者线程生成随机整数,将它们添加到队列中,然后通知消费者线程添加了一个新数字。

消费者线程等待通知,然后将队列的第一个元素打印到控制台并删除它。

我为每个添加到队列中的数字增加了一个计数器,并为每个从队列中取出的数字增加了一个计数器。

我期望程序完成后两个计数器保持相同的值,但是差异很大。表示添加到队列的计数器始终在百万范围内(在我上次测试中为 3871876),而表示从队列中取出数字的消费者的计数器始终低于 100k(在我上次测试中为 89993)。

有人可以向我解释为什么会有如此巨大的差异吗?我是否必须添加另一个条件变量,以便生产者线程也等待消费者线程?谢谢!

Jon*_*nas 4

无需一秒钟std::condition_variable,只需重复使用您拥有的即可。正如其他人提到的,您应该考虑使用std::atomic<bool>而不是普通的bool. 但我必须承认带有 -O3 的 g++ 并没有优化它。

#include <iostream>
#include <thread>
#include <condition_variable>
#include <queue>
#include <cstdlib>
#include <chrono>
#include <ctime>
#include <random>
#include <atomic>

//counts every number that is added to the queue
static long long producer_count = 0;
//counts every number that is taken out of the queue
static long long consumer_count = 0;

void generateNumbers(std::queue<int> & numbers, std::condition_variable & cv, std::mutex & m, std::atomic<bool> & workdone)
{
    while(!workdone.load())
    {
        std::unique_lock<std::mutex> lk(m);
        int rndNum = rand() % 100;
        numbers.push(rndNum);
        producer_count++;
        cv.notify_one(); // Notify worker
        cv.wait(lk); // Wait for worker to complete
     }
}

void work(std::queue<int> & numbers, std::condition_variable & cv, std::mutex & m, std::atomic<bool> & workdone)
{
    while(!workdone.load())
    {
        std::unique_lock<std::mutex> lk(m);
        cv.notify_one(); // Notify generator (placed here to avoid waiting for the lock)
        cv.wait(lk); // Wait for the generator to complete
        std::cout << numbers.front() << std::endl;
        numbers.pop();
        consumer_count++;
     }
}

int main() {
    std::condition_variable cv;
    std::mutex m;
    std::atomic<bool> workdone(false);
    std::queue<int> numbers;

    //start threads
    std::thread producer(generateNumbers, std::ref(numbers), std::ref(cv), std::ref(m), std::ref(workdone));
    std::thread consumer(work, std::ref(numbers), std::ref(cv), std::ref(m), std::ref(workdone));


    //wait for 3 seconds, then join the threads
    std::this_thread::sleep_for(std::chrono::seconds(3));
    workdone = true;
    cv.notify_all(); // To prevent dead-lock

    producer.join();
    consumer.join();

    //output the counters
    std::cout << producer_count << std::endl;
    std::cout << consumer_count << std::endl;

    return 0;
}
Run Code Online (Sandbox Code Playgroud)

编辑:

为了避免偶发的差一错误,您可以使用以下命令:

#include <iostream>
#include <thread>
#include <condition_variable>
#include <queue>
#include <cstdlib>
#include <chrono>
#include <ctime>
#include <random>
#include <atomic>

//counts every number that is added to the queue
static long long producer_count = 0;
//counts every number that is taken out of the queue
static long long consumer_count = 0;

void generateNumbers(std::queue<int> & numbers, std::condition_variable & cv, std::mutex & m, std::atomic<bool> & workdone)
{
    while(!workdone.load())
    {
        std::unique_lock<std::mutex> lk(m);
        int rndNum = rand() % 100;
        numbers.push(rndNum);
        producer_count++;
        cv.notify_one(); // Notify worker
        cv.wait(lk); // Wait for worker to complete
     }
}

void work(std::queue<int> & numbers, std::condition_variable & cv, std::mutex & m, std::atomic<bool> & workdone)
{
    while(!workdone.load() or !numbers.empty())
    {
        std::unique_lock<std::mutex> lk(m);
        cv.notify_one(); // Notify generator (placed here to avoid waiting for the lock)
        if (numbers.empty())
            cv.wait(lk); // Wait for the generator to complete
        if (numbers.empty())
            continue;
        std::cout << numbers.front() << std::endl;
        numbers.pop();
        consumer_count++;
     }
}

int main() {
    std::condition_variable cv;
    std::mutex m;
    std::atomic<bool> workdone(false);
    std::queue<int> numbers;

    //start threads
    std::thread producer(generateNumbers, std::ref(numbers), std::ref(cv), std::ref(m), std::ref(workdone));
    std::thread consumer(work, std::ref(numbers), std::ref(cv), std::ref(m), std::ref(workdone));


    //wait for 3 seconds, then join the threads
    std::this_thread::sleep_for(std::chrono::seconds(1));
    workdone = true;
    cv.notify_all(); // To prevent dead-lock

    producer.join();
    consumer.join();

    //output the counters
    std::cout << producer_count << std::endl;
    std::cout << consumer_count << std::endl;

    return 0;
}
Run Code Online (Sandbox Code Playgroud)