Mic*_*nda 10 c++ multithreading atomic c++11 relaxed-atomics
我为一个非常简单的数据写了一个容器,需要跨线程同步.我想要最好的表现.我不想使用锁.
我想用"放松"的原子.部分是为了一点额外的魅力,部分是为了真正理解它们.
我一直在研究这个问题,而且我正处于这个代码通过我抛出的所有测试的地步.但这并不是"证据",所以我想知道是否有任何我遗漏的东西,或者我可以测试的其他任何方式?
这是我的前提:
这就是我在想的."通常",我们对我们正在阅读的代码进行推理的方式是查看它所编写的顺序.内存可以被读取或写入"乱序",但不能使程序的正确性无效.
这在多线程环境中发生了变化.这就是内存栅栏的用途 - 这样我们仍然可以查看代码并能够推断它是如何工作的.
所以,如果一切都可以在这里完全失序,我在放松原子的做法是什么?这不是有点太远吗?
我不这么认为,但这就是我在这里寻求帮助的原因.
compare_exchange操作本身可以保证彼此之间具有连续的恒定性.
读取或写入原子的唯一另一个时间是在compare_exchange之前获取头部的初始值.它被设置为变量初始化的一部分.据我所知,这个操作是否带回了"适当的"值是无关紧要的.
当前代码:
struct node
{
node *n_;
#if PROCESSOR_BITS == 64
inline constexpr node() : n_{ nullptr } { }
inline constexpr node(node* n) : n_{ n } { }
inline void tag(const stack_tag_t t) { reinterpret_cast<stack_tag_t*>(this)[3] = t; }
inline stack_tag_t read_tag() { return reinterpret_cast<stack_tag_t*>(this)[3]; }
inline void clear_pointer() { tag(0); }
#elif PROCESSOR_BITS == 32
stack_tag_t t_;
inline constexpr node() : n_{ nullptr }, t_{ 0 } { }
inline constexpr node(node* n) : n_{ n }, t_{ 0 } { }
inline void tag(const stack_tag_t t) { t_ = t; }
inline stack_tag_t read_tag() { return t_; }
inline void clear_pointer() { }
#endif
inline void set(node* n, const stack_tag_t t) { n_ = n; tag(t); }
};
using std::memory_order_relaxed;
class stack
{
public:
constexpr stack() : head_{}{}
void push(node* n)
{
node next{n}, head{head_.load(memory_order_relaxed)};
do
{
n->n_ = head.n_;
next.tag(head.read_tag() + 1);
} while (!head_.compare_exchange_weak(head, next, memory_order_relaxed, memory_order_relaxed));
}
bool pop(node*& n)
{
node clean, next, head{head_.load(memory_order_relaxed)};
do
{
clean.set(head.n_, 0);
if (!clean.n_)
return false;
next.set(clean.n_->n_, head.read_tag() + 1);
} while (!head_.compare_exchange_weak(head, next, memory_order_relaxed, memory_order_relaxed));
n = clean.n_;
return true;
}
protected:
std::atomic<node> head_;
};
Run Code Online (Sandbox Code Playgroud)
与其他人相比,这个问题有什么不同?放松的原子.他们对这个问题产生了很大的影响.
所以你怎么看?有什么我想念的吗?
push
node->_next
已损坏,因为失败后您没有更新compareAndSwap
。当下一次尝试成功时,您最初存储的节点可能node->setNext
已被另一个线程从堆栈顶部弹出compareAndSwap
。结果,某个线程认为它已从堆栈中弹出一个节点,但该线程已将其放回堆栈中。它应该是:
void push(Node* node) noexcept
{
Node* n = _head.next();
do {
node->setNext(n);
} while (!_head.compareAndSwap(n, node));
}
Run Code Online (Sandbox Code Playgroud)
另外,由于next
和setNext
use memory_order_relaxed
,不能保证_head_.next()
这里返回最近推送的节点。有可能从堆栈顶部泄漏节点。显然也存在同样的问题pop
:_head.next()
可能会返回以前位于堆栈顶部但不再位于堆栈顶部的节点。如果返回值为nullptr
,当堆栈实际上不为空时,可能会无法弹出。
pop
如果两个线程尝试同时从堆栈中弹出最后一个节点,也可能出现未定义的行为。它们都看到相同的 值_head.next()
,一个线程成功完成弹出操作。另一个线程进入 while 循环 - 因为观察到的节点指针不是nullptr
- 但compareAndSwap
循环很快将其更新为,nullptr
因为堆栈现在为空。在循环的下一次迭代中,该 nullptr 被取消引用以获取其_next
指针,并且随之而来的是许多欢闹。
pop
显然也患有ABA。两个线程可以看到堆栈顶部的同一个节点。假设一个线程到达评估_next
指针的位置,然后阻塞。另一个线程成功弹出该节点,推送 5 个新节点,然后在另一个线程唤醒之前再次推送该原始节点。另一个线程compareAndSwap
将成功 - 堆栈顶部节点是相同的 - 但将旧_next
值存储到_head
而不是新值中。另一个线程推送的五个节点全部泄漏。情况也会如此memory_order_seq_cst
。