sbi*_*sbi 39 c++ multithreading synchronization readwritelock
我们发现在代码中我们有几个位置,其中并发读取受互斥锁保护的数据是相当常见的,而写入很少见.我们的测量结果似乎表明,使用简单的互斥锁严重阻碍了读取数据的代码的性能.所以我们需要的是多读/单写互斥.我知道这可以建立在更简单的原语之上,但在我尝试自己之前,我宁愿要求现有的知识:
从简单的同步原语中构建多读/单写锁的批准方法是什么?
我确实知道如何制作它,但我宁愿通过我(可能是错误的)想出的答案而不偏不倚.(注意:我期望的是如何解释它,可能是伪代码,而不是完整的实现.我当然可以自己编写代码.)
注意事项:
这需要有合理的表现.(我想到的是每次访问需要两次锁定/解锁操作.现在可能不够好,但需要其中许多操作似乎是不合理的.)
通常,读取数量更多,但写入比读取更重要且性能更敏感.读者不能让作家们挨饿.
我们被困在一个相当古老的嵌入式平台(VxWorks 5.5的专有变体)上,有一个相当旧的编译器(GCC 4.1.2)和boost 1.52 - 除了大多数依赖于POSIX的boost部分,因为POSIX没有完全实现在那个平台上.可用的锁定原语基本上是几种信号量(二进制,计数等),我们已经在其上创建了互斥量,条件变量和监视器.
这是IA32,单核.
How*_*ant 21
乍一看,我认为我认为这个答案与Alexander Terekhov介绍的算法相同.但经过研究,我相信它是有缺陷的.两个作者可以同时等待m_exclusive_cond.当其中一个编写器唤醒并获得独占锁时,它将exclusive_waiting_blocked = false启动unlock,从而将互斥锁设置为不一致状态.在那之后,互斥锁可能会被冲洗掉.
首先提出的N2406std::shared_mutex包含部分实现,下面用更新的语法重复.
class shared_mutex
{
mutex mut_;
condition_variable gate1_;
condition_variable gate2_;
unsigned state_;
static const unsigned write_entered_ = 1U << (sizeof(unsigned)*CHAR_BIT - 1);
static const unsigned n_readers_ = ~write_entered_;
public:
shared_mutex() : state_(0) {}
// Exclusive ownership
void lock();
bool try_lock();
void unlock();
// Shared ownership
void lock_shared();
bool try_lock_shared();
void unlock_shared();
};
// Exclusive ownership
void
shared_mutex::lock()
{
unique_lock<mutex> lk(mut_);
while (state_ & write_entered_)
gate1_.wait(lk);
state_ |= write_entered_;
while (state_ & n_readers_)
gate2_.wait(lk);
}
bool
shared_mutex::try_lock()
{
unique_lock<mutex> lk(mut_, try_to_lock);
if (lk.owns_lock() && state_ == 0)
{
state_ = write_entered_;
return true;
}
return false;
}
void
shared_mutex::unlock()
{
{
lock_guard<mutex> _(mut_);
state_ = 0;
}
gate1_.notify_all();
}
// Shared ownership
void
shared_mutex::lock_shared()
{
unique_lock<mutex> lk(mut_);
while ((state_ & write_entered_) || (state_ & n_readers_) == n_readers_)
gate1_.wait(lk);
unsigned num_readers = (state_ & n_readers_) + 1;
state_ &= ~n_readers_;
state_ |= num_readers;
}
bool
shared_mutex::try_lock_shared()
{
unique_lock<mutex> lk(mut_, try_to_lock);
unsigned num_readers = state_ & n_readers_;
if (lk.owns_lock() && !(state_ & write_entered_) && num_readers != n_readers_)
{
++num_readers;
state_ &= ~n_readers_;
state_ |= num_readers;
return true;
}
return false;
}
void
shared_mutex::unlock_shared()
{
lock_guard<mutex> _(mut_);
unsigned num_readers = (state_ & n_readers_) - 1;
state_ &= ~n_readers_;
state_ |= num_readers;
if (state_ & write_entered_)
{
if (num_readers == 0)
gate2_.notify_one();
}
else
{
if (num_readers == n_readers_ - 1)
gate1_.notify_one();
}
}
Run Code Online (Sandbox Code Playgroud)
该算法源自Alexander Terekhov的旧新闻组帖子.它既不是读者也不是作家.
有两个"门",gate1_和gate2_.读者和作者必须通过gate1_,并且在尝试这样做时可能会被阻止.一旦读者过去gate1_,它就会读取锁定互斥锁.读者可以拿过去gate1_,只要不存在具有所有权的读者的最大数量,并且只要一个作家没有得到过去gate1_.
一次只有一位作家可以过去gate1_.gate1_即使读者拥有所有权,作家也可以过去.但是一旦过去gate1_,作家仍然没有所有权.它必须先过去gate2_.在gate2_所有拥有所有权的读者放弃之后,作家才能过去.回想一下,gate1_当作家在等待时,新读者无法过去gate2_.gate1_当作家在等待时,新作家也不会过去gate2_.
读者和作者被阻止的特征gate1_(几乎)相同的要求通过它,这使得这种算法对读者和作者都公平,两者都不会挨饿.
互斥"状态"被有意地保持在一个单词中,以便表明对某些状态变化部分使用原子(作为优化)是可能的(即对于无争用的"快速路径").但是,此处未演示此优化.一个例子是如果一个编写器线程可以state_从0 原子地改变到write_entered那时他获得锁而不必阻塞甚至锁定/解锁mut_.并且unlock()可以使用原子库实现.等等.这里没有示出这些优化,因为它们比这个简单的描述使其听起来更难以正确实现.
qqi*_*row 13
看起来你只有mutex和condition_variable作为同步原语.因此,我在这里写了一个读写器锁,这让读者们感到沮丧.它使用一个互斥锁,两个conditional_variable和三个整数.
readers - readers in the cv readerQ plus the reading reader
writers - writers in cv writerQ plus the writing writer
active_writers - the writer currently writing. can only be 1 or 0.
Run Code Online (Sandbox Code Playgroud)
它以这种方式使读者挨饿.如果有几位作家想写作,那么在所有作家完成写作之前,读者永远不会有机会阅读.这是因为后来的读者需要检查writers变量.同时,active_writers变量将保证一次只能写一个作者.
class RWLock {
public:
RWLock()
: shared()
, readerQ(), writerQ()
, active_readers(0), waiting_writers(0), active_writers(0)
{}
void ReadLock() {
std::unique_lock<std::mutex> lk(shared);
while( waiting_writers != 0 )
readerQ.wait(lk);
++active_readers;
lk.unlock();
}
void ReadUnlock() {
std::unique_lock<std::mutex> lk(shared);
--active_readers;
lk.unlock();
writerQ.notify_one();
}
void WriteLock() {
std::unique_lock<std::mutex> lk(shared);
++waiting_writers;
while( active_readers != 0 || active_writers != 0 )
writerQ.wait(lk);
++active_writers;
lk.unlock();
}
void WriteUnlock() {
std::unique_lock<std::mutex> lk(shared);
--waiting_writers;
--active_writers;
if(waiting_writers > 0)
writerQ.notify_one();
else
readerQ.notify_all();
lk.unlock();
}
private:
std::mutex shared;
std::condition_variable readerQ;
std::condition_variable writerQ;
int active_readers;
int waiting_writers;
int active_writers;
};
Run Code Online (Sandbox Code Playgroud)