我在多线程上实现等待和信号条件时遇到了问题.
线程需要锁定互斥锁并等待条件变量,直到某个其他线程发出信号为止.同时,另一个线程锁定相同的互斥锁并等待相同的条件变量.现在,在整个过程中并发运行的线程发出条件变量的信号,但我只想要等待的第一个线程必须发出信号,而不是其他线程.
据我了解,在 Windows 上 CRITICAL_SECTION 只能用作非递归互斥锁。要获得递归互斥锁,您必须使用 OpenMutex 和朋友。
但是,AFAIU、Win32 Mutex 不能与条件变量一起使用(InitializeConditionVariable 等)
有没有办法在 Windows 上将递归互斥体与条件变量结合使用?
通常,如果我想模拟某些工作或等待确切的时间间隔,我会使用condition_variable::wait_for或者最坏的情况thread::this_thread::sleep_for.但condition_variable 文档说明wait_for或wait_until方法可能会阻止比请求更长的时间.
由于调度或资源争用延迟,此函数可能会阻塞超过timeout_duration.
可以保证多少精确的等待间隔?
UPDATE
我可以不用到达condition_variable吗?
我目前正在研究条件变量,我开始得到它.但是从这里的代码:
void print_id (int id) {
std::unique_lock<std::mutex> lck(mtx);
while (!ready) cv.wait(lck);
// ...
std::cout << "thread " << id << '\n';
}
void go() {
std::unique_lock<std::mutex> lck(mtx); <<<<<< ?
ready = true;
cv.notify_all();
}
Run Code Online (Sandbox Code Playgroud)
在print_id我理解的声明,lck因为它将被使用cv.wait().在go函数中,我不明白目的声明,lck因为它没有被使用.我尝试删除并运行,看起来很好.是真的有必要还是我错过了什么?
我正在尝试使用单个消费者和单个生产者实现缓冲区。我只使用了 POSIX 信号量,但是,它们在 Rust 中不可用,我正在尝试使用 Rust 同步原语 ( Mutex, Condvar, Barrier, ...)实现一个微不足道的信号量问题,但我不想使用通道。
我的代码表现得太不规则了,在某些情况下运行良好,有时它只是在某个数字处停止,而在其他情况下它只是不开始计数。
如果我在主线程中等待 1 秒直到我发送Condvar通知,事情似乎会更好,但它并不能保证它不会进入死锁。
如何修复这个程序?我理解Condvar错了吗?
use std::thread;
use std::sync::{Arc, Condvar, Mutex};
struct Buffer {
is_data: Mutex<bool>,
is_data_cv: Condvar,
is_space: Mutex<bool>,
is_space_cv: Condvar,
buffer: Mutex<i32>,
}
fn producer(buffer: Arc<Buffer>) {
for i in 0..50 {
loop {
let mut is_space = buffer
.is_space_cv
.wait(buffer.is_space.lock().unwrap())
.unwrap();
if *is_space {
{
let mut hueco = buffer.buffer.lock().unwrap();
*hueco = i;
}
*is_space = false;
{ …Run Code Online (Sandbox Code Playgroud) 我正在尝试编写一个非常简单的线程池来了解它们在幕后是如何工作的。不过,我遇到了问题。当我使用condition_variable并调用notify_all()时,它只会唤醒池中的一个线程。
其他一切都很好。我已经排队了 900 个工作,每个工作都有不错的负载。唤醒的一个线程会消耗所有这些作业,然后返回睡眠状态。在下一个循环中,这一切都会再次发生。
问题是只有一个线程完成这项工作!我怎么搞砸了这个模板?
线程池.h:
#pragma once
#include <mutex>
#include <stack>
#include <atomic>
#include <thread>
#include <condition_variable>
class ThreadPool
{
friend void __stdcall ThreadFunc();
public:
static ThreadPool & GetInstance()
{
static ThreadPool sInstance;
return (sInstance);
}
public:
void AddJob(Job * job);
void DoAllJobs();
private:
Job * GetJob();
private:
const static uint32_t ThreadCount = 8;
std::mutex JobMutex;
std::stack<Job *> Jobs;
volatile std::atomic<int> JobWorkCounter;
std::mutex SharedLock;
std::thread Threads[ThreadCount];
std::condition_variable Signal;
private:
ThreadPool();
~ThreadPool();
public:
ThreadPool(ThreadPool const &) = delete;
void operator …Run Code Online (Sandbox Code Playgroud) 我试图搜索如何std::conidition_variable::wait在本地计算机上的标准库中实现,我可以看到wait_unitl但找不到wait。
我的问题是,该wait函数是如何在内部实现的,如何使线程无限期地休眠,它是使用某种长时间的休眠还是特定于操作系统的完全不同的东西?
谢谢!
假设我有一个类 ThreadQueue 持有一个std::queue,并且我将它的一个实例传递std::ref给一个线程。进一步假设,线程 1(主线程)创建并持有 ThreadQueue 对象并将消息倒入其中,第二个线程的任务是接收这些消息并将它们放在某处,例如,将它们写入日志文件。
该类看起来像:
#include <queue>
#include <mutex>
#include <condition_variable>
using namespace std;
template <typename T>
class ThreadQueue
{
queue<T> q_;
mutex mtx;
unique_lock<mutex> lck;
condition_variable cv;
public:
ThreadQueue() { lck = unique_lock<mutex>(mtx); }
~ThreadQueue() { if (lck.owns_lock()) lck.unlock(); }
void enqueue (const T&);
T dequeue ();
};
template <typename T>
void ThreadQueue<T>::enqueue (const T& t)
{
lck.lock();
q_.push(t);
lck.unlock();
cv.notify_one();
}
template <typename T>
T ThreadQueue<T>::dequeue ()
{
cv.wait(lck);
lck.lock();
T t …Run Code Online (Sandbox Code Playgroud) 尝试在 Linux 下用 C++ 创建异步 I/O 文件读取器。我的例子有两个缓冲区。第一个读取块。然后,每次主循环时,我都会异步启动 IO 并调用process()它来运行当前块的模拟处理。处理完成后,我们等待条件变量。这个想法是异步处理程序应该通知条件变量。
不幸的是,这notify似乎发生在之前wait,而且这似乎不是条件变量wait()函数的工作方式。我应该如何重写代码,以便循环等待异步 io 完成?
#include <aio.h>
#include <fcntl.h>
#include <signal.h>
#include <unistd.h>
#include <condition_variable>
#include <cstring>
#include <iostream>
#include <thread>
using namespace std;
using namespace std::chrono_literals;
constexpr uint32_t blockSize = 512;
mutex readMutex;
condition_variable cv;
int fh;
int bytesRead;
void process(char* buf, uint32_t bytesRead) {
cout << "processing..." << endl;
usleep(100000);
}
void aio_completion_handler(sigval_t sigval) {
struct aiocb* req = (struct aiocb*)sigval.sival_ptr;
// check …Run Code Online (Sandbox Code Playgroud) 我在这里有一个简单的例子:
自从我尝试学习c ++ 11线程以来,该项目可称为学术性的.这是对正在发生的事情的描述.
想象一下,如果std::string有大量的汇编源代码,那就非常大
mov ebx,ecx;\r \nmov eax,ecx;\r \n ......
Parse()function接受此字符串并通过标记行的开头和结尾并将其保存string::const_iterators在作业队列中来查找所有行位置.
之后,2个工作线程从队列中弹出此信息,并将子字符串解析为Intstuction类对象.他们将结果的Instruction类实例推送到std::vector<Instruction> result
这是一个结构声明,用于保存要解析的子字符串的行号和迭代器
struct JobItem {
int lineNumber;
string::const_iterator itStart;
string::const_iterator itEnd;
};
Run Code Online (Sandbox Code Playgroud)
那是一个小记录器......
void ThreadLog(const char* log) {
writeMutex.lock();
cout << "Thr:" << this_thread::get_id() << " " << log << endl;
writeMutex.unlock();
}
Run Code Online (Sandbox Code Playgroud)
这是共享数据:
queue<JobItem> que;
vector<Instruction> result;
Run Code Online (Sandbox Code Playgroud)
以下是同步的所有原语
condition_variable condVar;
mutex condMutex;
bool signaled = false;
mutex writeMutex;
bool done=false;
mutex resultMutex;
mutex queMutex;
Run Code Online (Sandbox Code Playgroud)
每线程功能
void Func() {
unique_lock<mutex> condLock(condMutex); …Run Code Online (Sandbox Code Playgroud) c++ multithreading condition-variable thread-synchronization c++11
c++ ×7
c++11 ×3
mutex ×3
concurrency ×2
asynchronous ×1
c ×1
pthreads ×1
rust ×1
std ×1
threadpool ×1
wait ×1
windows ×1