我遇到一个问题,我很难判断应该使用哪个同步原语。
我正在创建 n 个在内存区域上工作的并行线程,每个线程都分配给该区域的特定部分,并且可以独立于其他线程完成其任务。在某些时候,我需要收集所有线程的工作结果,这是使用屏障的一个很好的例子,这就是我正在做的事情。
我必须使用 n 个工作线程之一来收集其所有工作的结果,为此,我在线程函数中的计算代码后面添加了以下代码:
if (pthread_barrier_wait(thread_args->barrier)) {
// Only gets called on the last thread that goes through the barrier
// This is where I want to collect the results of the worker threads
}
Run Code Online (Sandbox Code Playgroud)
到目前为止一切顺利,但现在我陷入困境:上面的代码处于循环中,因为我希望线程在一定数量的循环旋转中再次完成工作。这个想法是,每次pthread_barrier_wait解除阻塞都意味着所有线程都已完成其工作,并且循环/并行工作的下一次迭代可以再次开始。
这样做的问题是,在其他线程再次开始处理该区域之前,不能保证结果收集器块语句的执行,因此存在竞争条件。我正在考虑使用这样的 UNIX 条件变量:
// This code is placed in the thread entry point function, inside
// a loop that also contains the code doing the parallel
// processing code.
if (pthread_barrier_wait(thread_args->barrier)) {
// We lock the mutex
pthread_mutex_lock(thread_args->mutex);
collectAllWork(); …Run Code Online (Sandbox Code Playgroud) 我在cppreference上找到的信息在这方面很模糊,所以我在这里询问。假设我有两个线程正在等待一个条件,其中一个具有 true 谓词,另一个具有 false 谓词(例如condition.wait(lock, [=]{ return some_condition; })。主线程决定用 随机通知其中一个cond.notify_one()。
假设所选择的等待线程是谓词为假的线程。该线程是否会隐式通知下一个线程(如果还有剩余),或者注定要等待直到虚假唤醒?
如果只有一个线程被唤醒,无论其条件成功还是失败,第一个线程尝试唤醒下一个线程以获得保证成功的通知的好方法是什么?一个天真的修复:
condition.wait(lock, [=] {
if (!some_condition) condition.notify_one();
return some_condition;
});
Run Code Online (Sandbox Code Playgroud)
除了悲观之外,“通知波”可能会重复通知相同的线程,这是无效的+在没有线程具有成功谓词的情况下永远不会停止。Anotify_all()不起作用,因为我们可能会意外地结束唤醒满足条件的多个线程,同时我们最多只希望一个线程通过。
使用 c++11。既然std::notify_all会导致虚假唤醒,那么为什么std::notify_all会保留而不是std::notify_one一直存在呢?std::notify_one 顺便说一句,可能会导致虚假唤醒吗?
阐述一下我的疑惑:
当我调用std::condition_variable.wait/wait_for/wait_untiland时std::notify_XXX,我的目的通常是实现线程同步。也就是说,更多的线程阻塞等待,直到另一个线程通知只有其中一个线程解除阻塞。
那么我就可以调用notify_one来实现这一点,但是为什么还有另一个notify_all,它的目的是什么,或者notify_all适合什么情况?在我的情况下,当我调用 时notify_all,它会唤醒所有等待线程,然后只有一个线程真正解除阻塞,而其他线程仍然阻塞,这是否称为虚假唤醒?如果notify_one也会调用虚假唤醒?
我在网上找到了这段代码,解释了如何使用 astd::condition_variable来解决生产者-消费者问题:Producer-Consumer Problem using Condition Variable in C++
#include <condition_variable> // std::condition_variale
#include <cstdlib>
#include <iostream>
#include <mutex>
#include <thread>
using namespace std;
std::mutex g_mutex;
std::condition_variable g_cv;
bool g_ready = false;
int g_data = 0;
int produceData() {
int randomNumber = rand() % 1000;
std::cout << "produce data: " << randomNumber << "\n";
return randomNumber;
}
void consumeData(int data) { std::cout << "receive data: " << data << "\n"; }
void consumer() {
int data = 0; …Run Code Online (Sandbox Code Playgroud) 在此示例中将唤醒多少个等待线程:
第一个帖子:
void wakeUp2Threads()
{
std::unique_lock<std::mutex> lock(condvar_mutex);
condvar.notify_one();
condvar.notify_one();
}
Run Code Online (Sandbox Code Playgroud)
第二线程:
{
std::unique_lock<std::mutex> lock(condvar_mutex);
condvar.wait(lock); <- 2nd thread has entered here before 1st thread entered wakeUp2Threads.
}
Run Code Online (Sandbox Code Playgroud)
第3个帖子(与第2 个帖子相同):
{
std::unique_lock<std::mutex> lock(condvar_mutex);
condvar.wait(lock); <- 3rd thread has entered here before 1st thread entered wakeUp2Threads.
}
Run Code Online (Sandbox Code Playgroud)
有没有保证在这个例子中两个通知都会被传递到不同的线程,而不是多次传递给同一个线程?
即notify_one()是什么意思:
1) notify one thread, no matter has it been already notified (but has not been woken up yet), or not. (* see note)
or
2) notify one thread, but …Run Code Online (Sandbox Code Playgroud) 我想确定我理解条件变量是如何工作的,所以我将使用我写的程序来问我的问题.
在我的程序中,我有一个"生产者"线程(一个)和"工作线程"(几个让我们假设3).
生产者线程"处理"一个FIFO链表,这意味着它所做的只是检查列表开头是否有一个项(在我的程序类型请求中调用Req)(由一个名为front的全局指针指向)在我的程序中)如果是这样,将它分配到一个全局请求元素(称为globalReq).
工作线程,在一个循环中运行,等待处理请求,通过提取全局请求变量,将它们变为自己的局部变量(对于它们中的每一个都是"私有"的,因为每个线程都有一个独立的堆栈 - 正确我,如果我错了),然后处理请求.
为了做到这一点,我使用互斥量和条件变量.
一个重要的注意事项是,一旦请求存在(暂时让我们假设只存在一个请求),那么工作线程中的哪一个将"关注"它(假设它们都是"自由的" - 睡觉并不重要在条件变量上).
在提取请求并将其分配到全局请求之后,生成器线程调用pthread_cond_signal- 据我所知,解锁至少一个"阻塞"线程 - >因此它可以解除阻塞,例如2个线程.
所以我的问题是我现有的代码(如下):
1)我怎样才能保证只有一个线程(来自工作线程)将处理请求.我是否需要在所有通用"生产者消费者"实施中添加"while check loop"?
2)如何通过pthread_cond_broadcast(或者如果一个pthread_cond_signal未被阻塞的多个线程)通过互斥锁上的内容解除阻塞的线程,可能我还没有掌握它...
(每个)工作线程的代码是:
void *worker(void *arg)
{
while(1)
{
printf("\n BEFORE LOCKING sthread_mutex with thread: %d \n", syscall(SYS_gettid));
pthread_mutex_lock(&sthread_mutex);
printf("\n AFTER UNLOCKING sthread_mutex with thread: %d \n", syscall(SYS_gettid));
printf("\n BEFORE WAITING ON cond_var with thread: %d \n", …Run Code Online (Sandbox Code Playgroud) 为什么在下面的代码段,并在模板类型不能自动从最后一个参数就像在做推断,std::condition_variable::wait?
template< typename Predicate >
//requires Truth< Predicate >
class lock_monitor_guard
{
public:
lock_monitor_guard( std::mutex& mutex, std::condition_variable& monitor, Predicate predicate );
~lock_monitor_guard();
private:
std::unique_lock<std::mutex> lock;
std::condition_variable& monitor;
};
template< typename Predicate >
//requires Truth< Predicate >
lock_monitor_guard<Predicate>::lock_monitor_guard( std::mutex& mutex, std::condition_variable& monitor, Predicate predicate )
: lock( mutex ), monitor( monitor )
{
monitor.wait<Predicate>( lock, predicate );
}
template< typename Predicate >
//requires Truth< Predicate >
lock_monitor_guard<Predicate>::~lock_monitor_guard()
{
lock.unlock();
monitor.notify_one();
}
Run Code Online (Sandbox Code Playgroud)
当我尝试构建类似的行时lock_monitor_guard guard( jobs_mutex, jobs_monitor, ([]()->bool{return true;}) ); …
我有一个后台线程用于上传文件.它循环运行; 它做了一些工作,然后睡觉直到超时过去或直到通过条件变量明确通知有更多的工作要做.问题是有时候我无法让线程快速退出.
这是一个简化版本:
std::thread g_thread;
std::mutex g_mutex;
std::condition_variable g_cond;
bool g_stop = false;
void threadLoop()
{
while (!g_stop)
{
printf("doing some stuff\n");
std::unique_lock<std::mutex> lock(g_mutex);
g_cond.wait_for(lock, std::chrono::seconds(15));
}
}
int main(int argc, char* argv[])
{
g_stop = false;
g_thread = std::thread(threadLoop);
printf("hello\n");
g_stop = true;
g_cond.notify_one();
g_thread.join();
}
Run Code Online (Sandbox Code Playgroud)
当我运行这个测试程序时,我希望它能够快速退出,但有时它会卡在wait_for()中.我想也许在线程在wait_for()中休眠之前发生了notify_one(),但是在检查了g_stop之后.
是否有一个简单的解决方案,或其他更好的设计模式?
你好,
我是C++的新手,但我有6年的Java经验,2年的C经验和一些并发基础知识.我正在尝试创建一个线程池来处理任务.它位于相关测试主体的下方.
似乎错误是由...生成的
void ThreadPool::ThreadHandler::enqueueTask(void (*task)(void)) {
std::lock_guard<std::mutex> lock(queueMutex);
Run Code Online (Sandbox Code Playgroud)
正如我的调试器所说,但是做传统的cout调试,我发现有时候它可以在没有segfaulting和删除的情况下工作
threads.emplace(handler->getSize(), handler);
Run Code Online (Sandbox Code Playgroud)
从ThreadPool::enqueueTask()大大提高稳定性.
总的来说,我认为这与我对condition_variable(称为idler)的错误使用有关.
编译器:CLION中的minGW-w64
的.cpp
#include <iostream>
#include "ThreadPool.h"
ThreadPool::ThreadHandler::ThreadHandler(ThreadPool *parent) : parent(parent) {
thread = std::thread([&]{
while (this->parent->alive){
if (getSize()){
std::lock_guard<std::mutex> lock(queueMutex);
(*(queue.front()))();
queue.pop_front();
} else {
std::unique_lock<std::mutex> lock(idlerMutex);
idler.wait(lock);
}
}
});
}
void ThreadPool::ThreadHandler::enqueueTask(void (*task)(void)) {
std::lock_guard<std::mutex> lock(queueMutex);
queue.push_back(task);
idler.notify_all();
}
size_t ThreadPool::ThreadHandler::getSize() {
std::lock_guard<std::mutex> lock(queueMutex);
return queue.size();
}
void ThreadPool::enqueueTask(void (*task)(void)) {
std::lock_guard<std::mutex> lock(threadsMutex);
std::map<int, ThreadHandler*>::iterator iter = threads.begin();
threads.erase(iter);
ThreadHandler *handler = …Run Code Online (Sandbox Code Playgroud) 我需要一个使用notify_all()方法的示例。因为我不知道它应该如何工作。
每个等待线程均以如下代码开头:
std::unique_lock<std::mutex> lock(mutex);
condition_variable.wait(lock, [](){return SOMETHING;});
Run Code Online (Sandbox Code Playgroud)
从一开始,等待线程就需要获取互斥体。因此,如果有多个等待线程,则其余线程将等待锁定互斥锁。那么,如果等待线程卡在锁定互斥锁上并且根本不执行方法wait(),那么使用notify_all()的目的是什么?这些线程将一个接一个地唤醒,而不是同时唤醒。