C++ Single Producer多个消费者程序偶尔崩溃

san*_*ank 1 c++ multithreading producer-consumer race-condition

在下面的代码中,我正在创建一个producer threadn consumer threads从每个专用queue和打印到的读取stdout.这段代码有时会在声明中崩溃consumerQueues[id]->empty().通过调试去我看到consumerQueues[id]0x0当它崩溃.现在在init()函数中,我在创建worker 之前创建了ith使用者.我不确定为什么会留下来.请帮我弄清楚发生了什么.queueiththreadconsumerQueues[id]0x0

#include <thread>
#include <queue>
#include <memory>
#include <iostream>
#include <mutex>
#include <condition_variable>

class Test
{
private:
    void producer()
    {
        while(true)
        {
            std::string s = "abc";
            for(const auto& q : consumerQueues)
            {
                std::unique_lock<std::mutex> lock(mutex);
                q->push(s);
                condition_variable.notify_all();
            }
        }
    }

    void consumer(int id)
    {
        while (true)
        {
            std::string job;
            {
                std::unique_lock<std::mutex> lock(mutex);
                while(consumerQueues[id]->empty())
                {
                    condition_variable.wait(lock);
                }
                job = consumerQueues[id]->front();
                consumerQueues[id]->pop();
            }
            std::cout << "ID "<< id << " job " << job << std::endl;
        }
    }

    std::mutex mutex;
    std::condition_variable condition_variable;
    std::vector<std::thread> workers;
    std::vector<std::shared_ptr<std::queue<std::string>>> consumerQueues;
    std::thread producerThread;

public:

    Test(const unsigned n_threads):
    workers(std::vector<std::thread>(n_threads))
    {}

    Test(const Test &) = delete;
    Test(Test &&) = delete;

    Test & operator=(const Test &) = delete;
    Test & operator=(Test &&) = delete;

    void init()
    {
        for (unsigned i = 0; i < workers.size(); ++i)
        {
            consumerQueues.push_back(std::make_shared<std::queue<std::string>>());
            workers[i] = std::thread(&Test::consumer, this, i);
        }
       producerThread  = std::thread(&Test::producer, this);
    }

    ~Test()
    {
        producerThread.join();
        for (unsigned i = 0; i < workers.size(); ++i)
        {
            if(workers[i].joinable())
            {
                workers[i].join();
            }
        }
    }
};


int main()
{
    Test t(1000);
    t.init();
    return 0;
}
Run Code Online (Sandbox Code Playgroud)

aho*_*olm 5

你的init函数正在修改没有互斥的std :: vector.这会在线程逐个启动的同时修改向量.

为了使这个工作,你的init函数需要是这样的:

 void init() {
     for (unsigned i = 0; i < workers.size(); ++i) {
            std::unique_lock<std::mutex> lock(mutex);
            consumerQueues.push_back(std::make_shared<std::queue<std::string>>());
            workers[i] = std::thread(&Test::consumer, this, i);
     }
     producerThread  = std::thread(&Test::producer, this);
 }
Run Code Online (Sandbox Code Playgroud)

来自:http://www.cplusplus.com/reference/vector/vector/push_back/

数据竞赛

容器已修改.如果发生重新分配,则修改所有包含的元素.否则,不会访问现有元素,并且同时访问或修改它们是安全的.

当它从0元素开始并且变为1000时,重新分配经常发生.因此您还可以保留向量的大小以确保不会发生重新分配:

 void init() {
     consumerQueues.reserve(workers.size());
     for (unsigned i = 0; i < workers.size(); ++i) {
            consumerQueues.push_back(std::make_shared<std::queue<std::string>>());
            workers[i] = std::thread(&Test::consumer, this, i);
     }
     producerThread  = std::thread(&Test::producer, this);
 }
Run Code Online (Sandbox Code Playgroud)

  • *但线程只访问已创建的`std :: vector`部分* - 但该部分将通过进一步的`push_back()`操作重新分配.你可以通过在`init()`中循环之前`reserve()`足够的元素来避免这种情况. (2认同)