san*_*ank 1 c++ multithreading producer-consumer race-condition
在下面的代码中,我正在创建一个producer thread并n consumer threads从每个专用queue和打印到的读取stdout.这段代码有时会在声明中崩溃consumerQueues[id]->empty().通过调试去我看到consumerQueues[id]是0x0当它崩溃.现在在init()函数中,我在创建worker 之前创建了ith使用者.我不确定为什么会留下来.请帮我弄清楚发生了什么.queueiththreadconsumerQueues[id]0x0
#include <thread>
#include <queue>
#include <memory>
#include <iostream>
#include <mutex>
#include <condition_variable>
class Test
{
private:
void producer()
{
while(true)
{
std::string s = "abc";
for(const auto& q : consumerQueues)
{
std::unique_lock<std::mutex> lock(mutex);
q->push(s);
condition_variable.notify_all();
}
}
}
void consumer(int id)
{
while (true)
{
std::string job;
{
std::unique_lock<std::mutex> lock(mutex);
while(consumerQueues[id]->empty())
{
condition_variable.wait(lock);
}
job = consumerQueues[id]->front();
consumerQueues[id]->pop();
}
std::cout << "ID "<< id << " job " << job << std::endl;
}
}
std::mutex mutex;
std::condition_variable condition_variable;
std::vector<std::thread> workers;
std::vector<std::shared_ptr<std::queue<std::string>>> consumerQueues;
std::thread producerThread;
public:
Test(const unsigned n_threads):
workers(std::vector<std::thread>(n_threads))
{}
Test(const Test &) = delete;
Test(Test &&) = delete;
Test & operator=(const Test &) = delete;
Test & operator=(Test &&) = delete;
void init()
{
for (unsigned i = 0; i < workers.size(); ++i)
{
consumerQueues.push_back(std::make_shared<std::queue<std::string>>());
workers[i] = std::thread(&Test::consumer, this, i);
}
producerThread = std::thread(&Test::producer, this);
}
~Test()
{
producerThread.join();
for (unsigned i = 0; i < workers.size(); ++i)
{
if(workers[i].joinable())
{
workers[i].join();
}
}
}
};
int main()
{
Test t(1000);
t.init();
return 0;
}
Run Code Online (Sandbox Code Playgroud)
你的init函数正在修改没有互斥的std :: vector.这会在线程逐个启动的同时修改向量.
为了使这个工作,你的init函数需要是这样的:
void init() {
for (unsigned i = 0; i < workers.size(); ++i) {
std::unique_lock<std::mutex> lock(mutex);
consumerQueues.push_back(std::make_shared<std::queue<std::string>>());
workers[i] = std::thread(&Test::consumer, this, i);
}
producerThread = std::thread(&Test::producer, this);
}
Run Code Online (Sandbox Code Playgroud)
来自:http://www.cplusplus.com/reference/vector/vector/push_back/
数据竞赛
容器已修改.如果发生重新分配,则修改所有包含的元素.否则,不会访问现有元素,并且同时访问或修改它们是安全的.
当它从0元素开始并且变为1000时,重新分配经常发生.因此您还可以保留向量的大小以确保不会发生重新分配:
void init() {
consumerQueues.reserve(workers.size());
for (unsigned i = 0; i < workers.size(); ++i) {
consumerQueues.push_back(std::make_shared<std::queue<std::string>>());
workers[i] = std::thread(&Test::consumer, this, i);
}
producerThread = std::thread(&Test::producer, this);
}
Run Code Online (Sandbox Code Playgroud)