Der*_*unk 7 c concurrency locking c99
有谁知道在C中实现Peterson的Lock算法的好/正确?我似乎无法找到这个.谢谢.
Peterson的算法无法在C99中正确实现,正如在x86上订购内存栅栏的人所解释的那样.
彼得森的算法如下:
LOCK:
interested[id] = 1 interested[other] = 1
turn = other turn = id
while turn == other while turn == id
and interested[other] == 1 and interested[id] == 1
UNLOCK:
interested[id] = 0 interested[other] = 0
Run Code Online (Sandbox Code Playgroud)
这里有一些隐藏的假设.首先,每个线程必须注意它在获得锁定之前获得锁定的兴趣.放弃转弯必须使我们有兴趣获得锁定的其他线程可见.
此外,与每次锁定一样,临界区中的内存访问不能通过lock()调用提升,也不能通过unlock()沉没.即:lock()必须至少具有获取语义,而unlock()必须至少具有释放语义.
在C11中,实现这一目标的最简单方法是使用顺序一致的内存顺序,这使得代码运行就好像它是按程序顺序运行的线程的简单交错(警告:完全未经测试的代码,但它类似于一个示例在Dmitriy V'jukov的Relacy Race Detector中):
lock(int id)
{
atomic_store(&interested[id], 1);
atomic_store(&turn, 1 - id);
while (atomic_load(&turn) == 1 - id
&& atomic_load(&interested[1 - id]) == 1);
}
unlock(int id)
{
atomic_store(&interested[id], 0);
}
Run Code Online (Sandbox Code Playgroud)
这可确保编译器不会进行破坏算法的优化(通过在原子操作中提升/下载加载/存储),并发出适当的CPU指令以确保CPU也不会破坏算法.没有明确选择内存模型的C11/C++ 11原子操作的默认内存模型是顺序一致的内存模型.
C11/C++ 11还支持较弱的内存模型,允许尽可能多的优化.以下是由Anthony Williams编写的C++ 11译本C11的翻译,该算法最初由Dmitriy V'jukov在他自己的Relacy Race Detector [petersons_lock_with_C++ 0x_atomics] [the- inscrutable -c-memory ]的语法中编写. -model].如果这个算法不正确,那就是我的错(警告:还有未经测试的代码,但基于Dmitriy V'jukov和Anthony Williams的优秀代码):
lock(int id)
{
atomic_store_explicit(&interested[id], 1, memory_order_relaxed);
atomic_exchange_explicit(&turn, 1 - id, memory_order_acq_rel);
while (atomic_load_explicit(&interested[1 - id], memory_order_acquire) == 1
&& atomic_load_explicit(&turn, memory_order_relaxed) == 1 - id);
}
unlock(int id)
{
atomic_store_explicit(&interested[id], 0, memory_order_release);
}
Run Code Online (Sandbox Code Playgroud)
注意与获取和释放语义的交换.交换是一种原子RMW操作.原子RMW操作始终读取在RMW操作中写入之前存储的最后一个值.此外,对原子对象的获取从同一原子对象上的发行版读取写入(或者从执行发布的线程或任何后来从任何原子RMW操作写入的任何后续对该对象的写入)创建同步 - 与释放与获得之间的关系.
因此,此操作是线程之间的同步点,在一个线程中的交换与任何线程执行的最后一次交换之间始终存在同步关系(或者对于第一次交换,初始化为turn).
因此,我们在商店interested[id]
与交换之间存在一个先前的顺序关系,来自/的turn两个连续交换之间的同步关系turn,以及来自/到的交换turn与负载之间的顺序关系interested[1 - id].这相当于interested[x]在不同线程中访问之间发生之前的关系,并turn提供线程之间的同步.这会强制使算法运行所需的所有顺序.
那么在C11之前这些事情是如何完成的?它涉及使用编译器和CPU特定的魔法.作为一个例子,让我们看看非常强烈有序的x86.IIRC,所有x86加载都具有获取语义,并且所有存储都具有释放语义(在SSE中保存非时间移动,精确地用于实现更高的性能,代价是偶尔需要发出CPU围栏以实现CPU之间的一致性).但这对于Peterson的算法来说还不够,因为Bartosz Milewsky解释了在
x86上的有序内存防护,为了使Peterson的算法能够工作,我们需要在访问之间建立一个排序,turn并且interested不会这样做可能会导致在看到interested[1 - id]写入之前的负载时interested[id],这是一件坏事.
因此,在GCC/x86中这样做的方法是(警告:虽然我测试了类似于以下内容的东西,实际上是错误的-petersons-algorithm执行的代码的修改版本,测试无法确保多线程的正确性代码):
lock(int id)
{
interested[id] = 1;
turn = 1 - id;
__asm__ __volatile__("mfence");
do {
__asm__ __volatile__("":::"memory");
} while (turn == 1 - id
&& interested[1 - id] == 1);
}
unlock(int id)
{
interested[id] = 0;
}
Run Code Online (Sandbox Code Playgroud)
在MFENCE从防止存储和加载到不同的存储器地址被重新排序.否则,写入interested[id]可以在存储缓冲区中排队,同时加载interested[1 - id].在许多当前的微体系结构中,a SFENCE可能就足够了,因为它可以实现为存储缓冲器漏极,但是IIUC SFENCE不需要以这种方式实现,并且可以简单地防止存储之间的重新排序.所以SFENCE到处都可能不够,我们需要一个完整的MFENCE.
编译器barrier(__asm__ __volatile__("":::"memory"))阻止编译器确定它已经知道它的值turn.我们告诉编译器我们已经破坏了内存,因此必须从内存重新加载缓存在寄存器中的所有值.
PS:我觉得这需要一个结束段落,但我的大脑已经耗尽了.
我不会对实现的良好程度或正确性做出任何断言,但已对其进行了测试(简要介绍)。这是维基百科上描述的算法的直接翻译。
struct petersonslock_t {
volatile unsigned flag[2];
volatile unsigned turn;
};
typedef struct petersonslock_t petersonslock_t;
petersonslock_t petersonslock () {
petersonslock_t l = { { 0U, 0U }, ~0U };
return l;
}
void petersonslock_lock (petersonslock_t *l, int p) {
assert(p == 0 || p == 1);
l->flag[p] = 1;
l->turn = !p;
while (l->flag[!p] && (l->turn == !p)) {}
};
void petersonslock_unlock (petersonslock_t *l, int p) {
assert(p == 0 || p == 1);
l->flag[p] = 0;
};
Run Code Online (Sandbox Code Playgroud)
Greg指出,在具有稍微宽松的内存一致性的SMP架构上(例如x86),尽管对同一内存位置的加载是按顺序进行的,但一个处理器上不同位置的加载可能对另一处理器而言则是混乱的。
Jens Gustedt和ninjalj建议修改原始算法以使用该atomic_flag类型。这意味着设置标志和转弯将使用atomic_flag_test_and_set和atomic_flag_clear从C11中清除它们。或者,可以在更新之间施加存储屏障flag。
编辑:我最初试图通过写入所有状态的相同存储位置来纠正此问题。ninjalj指出,按位运算将状态运算转换为RMW而不是原始算法的加载和存储。因此,需要原子按位运算。C11和内置的GCC都提供了这样的运算符。下面的算法使用GCC内置函数,但包装在宏中,因此可以轻松地将其更改为其他实现。但是,修改上面的原始算法是首选解决方案。
struct petersonslock_t {
volatile unsigned state;
};
typedef struct petersonslock_t petersonslock_t;
#define ATOMIC_OR(x,v) __sync_or_and_fetch(&x, v)
#define ATOMIC_AND(x,v) __sync_and_and_fetch(&x, v)
petersonslock_t petersonslock () {
petersonslock_t l = { 0x000000U };
return l;
}
void petersonslock_lock (petersonslock_t *l, int p) {
assert(p == 0 || p == 1);
unsigned mask = (p == 0) ? 0xFF0000 : 0x00FF00;
ATOMIC_OR(l->state, (p == 0) ? 0x000100 : 0x010000);
(p == 0) ? ATOMIC_OR(l->state, 0x000001) : ATOMIC_AND(l->state, 0xFFFF00);
while ((l->state & mask) && (l->state & 0x0000FF) == !p) {}
};
void petersonslock_unlock (petersonslock_t *l, int p) {
assert(p == 0 || p == 1);
ATOMIC_AND(l->state, (p == 0) ? 0xFF00FF : 0x00FFFF);
};
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
3388 次 |
| 最近记录: |