use*_*869 7 c++ atomic memory-barriers barrier stdatomic
我在《C++ Concurrency in Action》一书中遇到了这段代码,用于简单实现屏障(对于不能std::experimental::barrier在 C++17 或std::barrierC++20 中使用的代码)。
[编辑]屏障是一种同步机制,其中一组线程(线程数传递给屏障的构造函数)可以到达并等待(通过调用 wait 方法)或到达并丢弃(通过调用 did_waiting) 。如果组中的所有线程都到达屏障,则屏障将被重置,并且线程可以继续执行下一组操作。如果组中的某些线程脱落,则组中的线程数量相应减少,以进行下一轮与屏障的同步。[编辑结束]
以下是为简单实现屏障而提供的代码。
struct barrier
{
std::atomic<unsigned> count;
std::atomic<unsigned> spaces;
std::atomic<unsigned> generation;
barrier(unsigned count_):count(count_),spaces(count_),generation(0)
{}
void wait(){
unsigned const gen=generation.load();
if(!--spaces){
spaces=count.load();
++generation;
}else{
while(generation.load()==gen){
std::this_thread::yield();
}
}
}
void done_waiting(){
--count;
if(!--spaces){
spaces=count.load();
++generation;
}
}
};
Run Code Online (Sandbox Code Playgroud)
作者 Anthony Williams 提到,他选择顺序一致性排序是为了更容易推理代码,并表示可以使用宽松的排序来提高代码效率。这就是我更改代码以采用宽松排序的方法。请帮助我理解我的代码是否正确。
struct barrier
{
std::atomic<unsigned> count;
std::atomic<unsigned> spaces;
std::atomic<unsigned> generation;
barrier(unsigned count_):count(count_),spaces(count_),generation(0)
{}
void wait(){
unsigned const gen=generation.load(std::memory_order_acquire);
if(1 == spaces.fetch_sub(1, std::memory_order_relaxed)){
spaces=count.load(std::memory_order_relaxed);
generation.fetch_add(1, std::memory_order_release);
}else{
while(generation.load(std::memory_order_relaxed)==gen){
std::this_thread::yield();
}
}
}
void done_waiting(){
count.fetch_sub(1, std::memory_order_relaxed);
if(1 == spaces.fetch_sub(1, std::memory_order_relaxed)){
spaces=count.load(std::memory_order_relaxed);
generation.fetch_add(1, std::memory_order_release);
}
}
};
Run Code Online (Sandbox Code Playgroud)
道理是这样的。生成的增量是一个释放操作,与等待调用中生成的加载同步。这确保了从 count 到空间的加载对于所有调用 wait 并读取使用释放语义存储的 Generation 新值的线程都是可见的。
此后对空间的所有操作都是RMW操作,它们参与释放序列,因此可以是宽松的操作。这个推理是正确的还是这个代码是错误的?请帮助我理解。提前致谢。
[编辑]我尝试像这样使用我的屏障代码。
void fun(barrier* b){
std::cout << "In Thread " << std::this_thread::get_id() << std::endl;
b->wait();
std::cout << std::this_thread::get_id() << " First wait done" << std::endl;
b->wait();
std::cout << std::this_thread::get_id() << " Second wait done" << std::endl;
b->done_waiting();
}
int main(){
barrier b{2};
std::thread t(fun, &b);
fun(&b);
std::cout << std::this_thread::get_id() << " " << b.get_count() << std::endl;
t.join();
}
Run Code Online (Sandbox Code Playgroud)
我还尝试使用更多线程来测试它,并且在表面运行中,它似乎做了正确的事情。但我仍然想了解我的推理是否正确,或者我是否遗漏了一些非常明显的东西。[编辑结束]
我相信以下版本的代码具有最弱的顺序,但仍使其正确。
struct barrier
{
std::atomic<unsigned> count;
std::atomic<unsigned> spaces;
std::atomic<unsigned> generation;
barrier(unsigned count_):count(count_),spaces(count_),generation(0)
{}
void wait(){
unsigned const gen=generation.load(std::memory_order_relaxed);
if(1 == spaces.fetch_sub(1, std::memory_order_acq_rel)){
spaces.store(count.load(std::memory_order_relaxed),
std::memory_order_relaxed);
generation.fetch_add(1, std::memory_order_release);
}else{
while(generation.load(std::memory_order_acquire)==gen){
std::this_thread::yield();
}
}
}
void done_waiting(){
count.fetch_sub(1, std::memory_order_relaxed);
if(1 == spaces.fetch_sub(1, std::memory_order_acq_rel)){
spaces.store(count.load(std::memory_order_relaxed),
std::memory_order_relaxed);
generation.fetch_add(1, std::memory_order_release);
}
}
};
Run Code Online (Sandbox Code Playgroud)
在整个讨论中,当我们谈到特定代中到达屏障的“最后一个”线程时,我们指的是
spaces.fetch_sub(1)返回值 1 的唯一线程。
一般来说,如果一个线程正在创建一个承诺某些操作已完成的存储,那么它需要是一个发布存储。当另一个线程加载该值以获取操作完成的证据并且可以安全地使用结果时,则需要获取该加载。
现在,在手头的代码中,线程指示它已完成其“常规工作”(无论在调用wait()or
之前排序的内容done_waiting())的方式是通过递减spaces,即存储比前一个值小 1 的值。修改顺序。对于非最后一个线程,这是它所做的唯一存储。因此,spaces.fetch_sub()必须释放该商店。
对于最后一个线程,它知道自己是最后一个线程的方法是在 中加载值 1 spaces.fetch_sub()。这可以证明所有其他线程都已完成其常规工作,并且最后一个线程可以安全地删除屏障。所以spaces.fetch_sub()也需要获取。因此spaces.fetch_sub(1, std::memory_order_acq_rel)。
非最后线程确定屏障已关闭并且它们可以安全地继续进行的方式是通过从generationyield循环中加载一个值,并观察它与gen. 因此需要获取负载;它观察到的存储,即
generation.fetch_add(),需要被释放。
我认为这就是我们需要的所有障碍。last
线程完成的更新工作实际上是在它自己的小临界区中完成的,以 1 的获取负载开始
,以 1spaces的释放增量结束。此时,所有其他线程已经加载并存储了,每个调用的线程都已经加载了 的旧值
,并且每个调用的线程已经存储了 的新减量值。因此,最后一个线程可以通过宽松的顺序安全地操作它们,因为知道没有其他线程会这样做。countspacesgenerationspaceswaitgenerationdone_waiting()count
现在不需要
获取初始gen = generation.load()加载。它不能被推过商店到
,因为后者是发布的。因此,在存储值 1 之前,它肯定会被安全加载,并且只有在此之后,潜在的线程才会更新。wait()spacesspaceslastgeneration
现在让我们尝试给出一个正式的证明来证明这个新版本是正确的。看一下两个线程 A 和 B,它们执行以下操作:
barrier b(N); // N >= 2
void thrA() {
work_A();
b.wait(); // or b.done_waiting(), doesn't matter which
}
void thrB() {
b.wait();
work_B();
}
Run Code Online (Sandbox Code Playgroud)
还有N-2其他线程,每个线程都调用b.wait()
或b.done_waiting()。为简单起见,我们假设从第 0 代开始。
我们想证明这种work_A()情况发生在 之前work_B(),以便它们可以在相同的数据(冲突)上进行操作,而不会出现数据竞争。根据 B 是否是最后到达屏障的线程考虑两种情况。
假设B是最后一个线程。这意味着它从 中获取了值 1 的负载spaces。线程 A 必须释放存储某个 >= 1 的值,然后通过原子 RMW 操作(在其他线程中)连续递减该值,直到达到 1。这是一个以 A 的存储为首的释放序列,B 的加载从该序列中的最后一个副作用,因此 A 的存储与 B 的负载同步。通过排序,work_A()发生在 A 的存储之前,而 B 的加载发生在 之前work_B(),因此根据需要work_A()发生在之前
work_B()。
现在假设 B 不是最后一个线程。然后,仅当它从不同于 的值wait()
加载时才返回。让 L 表示实际的最后一个线程,它可能是 A。generationgen
我声称,作为第一步, B 中必须为 0。因为intogen的加载
发生在 B 的释放存储之前,并且如上所述,这导致了一个释放序列,该序列最终存储值 1(在第二个到 -最后一个线程)。L 中的负载从该副作用中获取其值,因此 B 的存储与 L 的负载同步。B 的加载
发生在其存储到 之前,L 的加载
发生在其存储 ( ) 到之前。所以 B 的加载发生在 L 的存储之前。通过读写一致性 [intro.races p17],B 的加载不得从 L 的存储中获取其值,而是从修改顺序中的某个较早的值获取。这必须为 0,因为没有对 进行其他修改。generationgenspaces.fetch_sub()spacesspacesgenerationspacesspacesfetch_add()generationgenerationgenerationgeneration
(如果我们在 G 代而不是 0 代工作,这只能证明gen <= G。但正如我在下面解释的那样,所有这些负载都发生在 之前的增量之后generation,这是值 G 存储的位置。因此这证明了相反的不等式gen >= G。 )
wait()因此 B仅当从 加载 1 时才返回generation。因此,最终的获取加载已从释放存储中获取其值并由generationL 完成,这表明 L 的存储发生在 B 返回之前。现在,L 的 1 加载spaces发生在其存储 to 之前generation(通过排序),而 A 的存储 tospaces发生在 L 的 1 加载之前,如前所述。(如果 L = A,则通过排序,相同的结论仍然成立。)我们现在有以下操作,完全按发生之前排序:
A: work_A();
A: store side of spaces.fetch_sub()
L: load of 1 in spaces.fetch_sub()
L: store side of generation.fetch_add()
B: acquire load of 1 from generation
B: work_B()
Run Code Online (Sandbox Code Playgroud)
并通过传递性得到想要的结论。(如果 L=A,则删除上面的第 2 行和第 3 行。)
我们可以类似地证明,在将 L 加载到 中之前count,各种调用中的所有递减都会发生,因此可以安全地放松这些。L 中的重新初始化以及 的增量都发生在任何线程从 中返回之前,因此即使这些线程被放宽,之后排序的任何屏障操作也会看到屏障被正确重置。done_waiting()countspacesspacesgenerationwait()
我认为这涵盖了所有所需的语义。
实际上,我们可以通过使用栅栏来进一步削弱一些东西。例如,acqinspaces.fetch_sub()仅仅是为了线程 L 的利益,它加载值 1;其他线程不需要它。所以我们可以这样做
if (1 == spaces.fetch_sub(1, std::memory_order_release)){
std::atomic_thread_fence(std::memory_order_acquire);
// ...
}
Run Code Online (Sandbox Code Playgroud)
那么只有线程L需要支付acquire的成本。这并不是说它真的很重要,因为所有其他线程无论如何都会休眠,所以我们不太可能关心它们是否很慢。
(本节是在我做出上面的修改版本之前写的。)
我相信至少有两个错误。
我们wait()自己来分析一下。考虑执行以下操作的代码:
int x = 0;
barrier b(2);
void thrA() {
x = 1;
b.wait();
}
void thrB() {
b.wait();
std::cout << x << std::endl;
}
Run Code Online (Sandbox Code Playgroud)
我们希望证明x = 1in thrA发生在in thrB的求值之前x,这样代码就不会出现数据竞争并被迫打印值1。
但我认为我们不能。假设 thrB 首先到达屏障,也就是说它观察到spaces.fetch_sub返回 2。因此每个线程中执行的加载和存储的顺序如下:
thrA:
x = 1;
gen=generation.load(std::memory_order_acquire); // returns 0
spaces.fetch_sub(1, std::memory_order_relaxed); // returns 1, stores 0
spaces=count.load(std::memory_order_relaxed); // stores 2
generation.fetch_add(1, std::memory_order_release); // returns 0, stores 1
thrB:
gen=generation.load(std::memory_order_acquire); // returns 0
spaces.fetch_sub(1, std::memory_order_relaxed); // returns 2, stores 1
generation.load(std::memory_order_relaxed); // returns 0
... many iterations
generation.load(std::memory_order_relaxed); // returns 1
x; // returns ??
Run Code Online (Sandbox Code Playgroud)
为了抱有希望,我们必须让AthrA 中的某些操作与BthrB 中的某些操作同步。B仅当是一个获取操作,该操作从以 为首的释放序列中的副作用获取其值时,这才是可能的A。但 thrB 中只有一个 acquire 操作,即初始generation.load(std::memory_order_acquire). 并且它不会0从 thrB 中的任何操作中获取其值(即 ),而是从generation任一线程启动之前发生的初始化中获取值。这个副作用不是任何有用的释放序列的一部分,当然也不是以 后发生的操作为首的任何释放序列的一部分x=1。所以我们的证明尝试失败了。
更非正式地,如果我们检查 的顺序thrB,我们会发现 的评估x可以在任何或所有宽松操作之前重新排序。事实上,它仅在generation.load(std::memory_order_relaxed)返回 1 时有条件地进行评估,这一事实并没有帮助;我们可以x更早地推测性地加载,并且该值仅在generation.load(std::memory_order_relaxed)finally返回1之后使用。所以我们所知道的是在返回0x之后的某个时间评估它generation.load(std::memory_order_acquire),这完全没有给我们任何关于thrA可能或可能不做什么的有用信息然后。
这个特定问题可以通过升级要获取的自旋循环中的负载generation,或者在循环退出之后但返回之前放置获取栅栏来解决wait()。
至于done_waiting,似乎也有问题。如果我们有
void thrA() {
x = 1;
b.done_waiting();
}
void thrB() {
b.wait();
std::cout << x;
}
Run Code Online (Sandbox Code Playgroud)
那么我们可能再次希望1在没有数据竞争的情况下被打印。但假设 thrA 首先到达障碍物。那么它所做的就是
x = 1;
count.fetch_sub(1, std::memory_order_relaxed); // irrelevant
spaces.fetch_sub(1, std::memory_order_relaxed); // returns 2, stores 1
Run Code Online (Sandbox Code Playgroud)
根本没有发布存储,因此它无法与 thrB 同步。
非正式地说,没有任何障碍可以防止商店x=1无限期延迟,因此不能保证 thrB 会遵守它。
现在已经很晚了,所以目前我将其作为如何解决此问题的练习。
顺便说一句,ThreadSanitizer 会检测这两种情况的数据争用: https: //godbolt.org/z/1MdbMbYob。我可能应该先尝试一下,但最初我不太清楚要实际测试什么。
目前我不确定这些是否是唯一的错误,或者是否还有更多错误。
| 归档时间: |
|
| 查看次数: |
194 次 |
| 最近记录: |