前段时间我一直在考虑如何相互实现各种同步原语.例如,在pthreads中,您可以获得互斥锁和条件变量,从中可以构建信号量.
在Windows API(或至少是旧版本的Windows API)中,存在互斥锁和信号量,但没有条件变量.我认为应该可以用互斥量和信号量来构建条件变量,但对于我的生活,我只是想不出办法.
有没有人知道这样做的好建筑?
我还没有完全围绕C++ 11多线程的东西,但我试图让多个线程等到主线程上的某个事件然后一次继续(处理发生的事情),并wait再次当他们'完成处理......循环直到它们关闭.以下不完全是 - 这是我的问题的简单再现:
std::mutex mutex;
std::condition_variable cv;
std::thread thread1([&](){ std::unique_lock<std::mutex> lock(mutex); cv.wait(lock); std::cout << "GO1!\n"; });
std::thread thread2([&](){ std::unique_lock<std::mutex> lock(mutex); cv.wait(lock); std::cout << "GO2!\n"; });
cv.notify_all(); // Something happened - the threads can now process it
thread1.join();
thread2.join();
Run Code Online (Sandbox Code Playgroud)
这有效...除非我停在一些断点上并放慢速度.当我这样做,我看Go1!,然后挂断,等待thread2的cv.wait.怎么了?
也许我不应该使用条件变量......周围没有任何条件wait,也没有需要使用互斥锁保护的数据.我该怎么做呢?
我今天正在尝试Windows对条件变量的支持(由微软为Windows Vista及更高版本提供).要初始化一个条件变量,我调用InitializeConditionVariable(),这很简单,但是当我使用它时,我没有看到任何方法来破坏条件变量.为什么没有DeleteConditionVariable()函数?
(我希望API类似于现有的CreateCriticalSection()/ DestroyCriticalSection()API)
我在www.cppreference.com上找到了以下关于条件变量的示例,http: //en.cppreference.com/w/cpp/thread/condition_variable .对cv.notify_one()的调用放在锁外.我的问题是,如果在保持锁定的同时进行调用以保证等待线程实际上处于等待状态并且将接收通知信号.
#include <iostream>
#include <string>
#include <thread>
#include <mutex>
#include <condition_variable>
std::mutex m;
std::condition_variable cv;
std::string data;
bool ready = false;
bool processed = false;
void worker_thread()
{
// Wait until main() sends data
std::unique_lock<std::mutex> lk(m);
cv.wait(lk, []{return ready;});
// after the wait, we own the lock.
std::cout << "Worker thread is processing data\n";
data += " after processing";
// Send data back to main()
processed = true;
std::cout << "Worker thread signals data processing completed\n";
// …Run Code Online (Sandbox Code Playgroud) 我试图在我的类中使用一个线程,然后线程需要使用a condition_variable,条件变量将被阻塞,直到谓词被更改为true.代码如下所示:
class myThreadClass{
bool bFlag;
thread t ;
mutex mtx;
condition_variable cv;
bool myPredicate(){
return bFlag;
}
int myThreadFunction(int arg){
while(true){
unique_lock<mutex> lck(mtx);
if(cv.wait_for(lck,std::chrono::milliseconds(3000),myPredicate)) //something wrong?
cout<<"print something...1"<<endl
else
cout<<"print something...2"<<endl
}
}
void createThread(){
t = thread(&myThreadClass::myThreadFunction,this,10);//this is ok
}
} ;
Run Code Online (Sandbox Code Playgroud)
编译时的这段代码会抛出错误说:
"wait_for"行中未解决的重载函数类型.
然后我尝试将其修改为:
if(cv.wait_for(lck,std::chrono::milliseconds(3000),&myThreadClass::myPredicate))
Run Code Online (Sandbox Code Playgroud)
但仍然存在错误.
我在Bjarne Stroustrup的"The C++ Programming Language,4th Edition"(第119页)中偶然发现了以下代码:
queue<Message> mqueue;
condition_variable mcond;
mutex mmutex;
void consumer()
{
while(true) {
unique_lock<mutex> lck{mmutex};
mcond.wait(lck);
auto m = mqueue.front();
mqueue.pop();
lck.unlock();
// process m
}
}
Run Code Online (Sandbox Code Playgroud)
还有一个生产者线程,它推Message送到队列并在循环中通知等待的线程.
我的问题是:是否需要unique_lock在循环的每次迭代中创建一个新的?这对我来说似乎是不必要的,因为在下一行中,mcond.wait(lck)锁在锁定之前直接解锁.
从性能的角度来看,lck变量只能在循环开始之前初始化?
为什么std::condition_variable::wait需要一个互斥体作为其变量之一?
您可以查看文档并引用:
wait... Atomically releases lock
Run Code Online (Sandbox Code Playgroud)
但这不是真正的原因。这只是进一步验证了我的问题:为什么它首先需要它?
谓词最有可能查询共享资源的状态,它必须受到锁保护。
好的。公平的。这里有两个问题
int i = 0;
void waits()
{
std::unique_lock<std::mutex> lk(cv_m);
cv.wait(lk, []{return i == 1;});
std::cout << i;
}
Run Code Online (Sandbox Code Playgroud)
int i = 0;
void waits()
{
cv.wait(lk);
std::unique_lock<std::mutex> lk(cv_m);
std::cout << i;
}
Run Code Online (Sandbox Code Playgroud)
我知道这种做法没有有害影响。我只是不知道如何向自己解释为什么它是这样设计的?
如果谓词是可选的并且没有传递给wait,为什么我们需要锁?
我正在尝试实现多线程作业,生产者和消费者,基本上我想要做的是,当消费者完成数据时,它会通知生产者,以便生产者提供新数据.
棘手的部分是,在我目前的impl中,生产者和消费者都互相通知并相互等待,我不知道如何正确地实现这一部分.
例如,请参阅下面的代码,
mutex m;
condition_variable cv;
vector<int> Q; // this is the queue the consumer will consume
vector<int> Q_buf; // this is a buffer Q into which producer will fill new data directly
// consumer
void consume() {
while (1) {
if (Q.size() == 0) { // when consumer finishes data
unique_lock<mutex> lk(m);
// how to notify producer to fill up the Q?
...
cv.wait(lk);
}
// for-loop to process the elems in Q
...
}
}
// producer …Run Code Online (Sandbox Code Playgroud) 一些内核提供对信号量的“刷新”操作以解除对等待信号量的所有任务的阻塞。
例如,VxWorks 有一个semFlush() API,它可以原子地解除对指定信号量上挂起的所有任务的阻塞,即所有任务在允许运行之前都将被解除阻塞。
我正在 Linux 上实现一个 C++ 类,它的行为类似于二进制信号量,并且还具有这种“刷新”功能。不幸的是,Linux 上的semaphore.h不提供类似 API 的 flush() 或 broadcast() 。
我尝试过的:使用条件变量来实现二进制信号量。这是我的伪代码:
class BinarySem
{
BinarySem();
bool given;
mutex m;
condition_var cv;
give();
take();
take( Timeout T );
tryTake();
flush();
}
BinarySem::BinarySem()
: given(false)
{}
// take(Timeout T), tryTake() not shown
// to make question concise on StackOverflow
BinarySem::give()
{
{
lock_guard lk(m);
given = true;
}
cv.notify_one();
}
BinarySem::flush()
{
{
lock_guard lk(m);
given …Run Code Online (Sandbox Code Playgroud) std::atomic<T>两者std::condition_variable都有成员wait和notify_one功能。在某些应用程序中,程序员可以选择使用其中之一来实现同步目的。这些wait函数的目标之一是它们应与操作系统协调以最大程度地减少虚假唤醒。也就是说,操作系统应该避免唤醒wait-ing 线程,直到notify_one或notify_all被调用。
在我的机器上,sizeof(std::atomic<T>)issizeof(T)和sizeof(std::condition_variable)is 72。如果排除std::atomic<T>的T成员,则std::condition_variable保留 72 字节用于其同步目的,同时sizeof(std::atomic<T>)保留 0 字节。
我的问题:我应该期望std::condition_variables 和std::atomic<T>swait函数之间有不同的行为吗?例如,是否应该std::condition_variable减少虚假唤醒?