无锁堆栈 - 这是c ++ 11宽松原子的正确用法吗?可以证明吗?

Mic*_*nda 10 c++ multithreading atomic c++11 relaxed-atomics

我为一个非常简单的数据写了一个容器,需要跨线程同步.我想要最好的表现.我不想使用锁.

我想用"放松"的原子.部分是为了一点额外的魅力,部分是为了真正理解它们.

我一直在研究这个问题,而且我正处于这个代码通过我抛出的所有测试的地步.但这并不是"证据",所以我想知道是否有任何我遗漏的东西,或者我可以测试的其他任何方式?

这是我的前提:

  • 唯一重要的是正确地推送和弹出节点,并且堆栈永远不会失效.
  • 我相信内存中的操作顺序只在一个地方很重要:
    • 在compare_exchange操作之间.这是有保证的,即使是轻松的原子.
  • 通过向指针添加标识号来解决"ABA"问题.在32位系统上,这需要双字compare_exchange,而在64位系统上,未使用的16位指针用id号填充.
  • 因此:堆栈将始终处于有效状态. (对?)

这就是我在想的."通常",我们对我们正在阅读的代码进行推理的方式是查看它所编写的顺序.内存可以被读取或写入"乱序",但不能使程序的正确性无效.

这在多线程环境中发生了变化.这就是内存栅栏的用途 - 这样我们仍然可以查看代码并能够推断它是如何工作的.

所以,如果一切都可以在这里完全失序,我在放松原子的做法是什么?这不是有点太远吗?

我不这么认为,但这就是我在这里寻求帮助的原因.

compare_exchange操作本身可以保证彼此之间具有连续的恒定性.

读取或写入原子的唯一另一个时间是在compare_exchange之前获取头部的初始值.它被设置为变量初始化的一部分.据我所知,这个操作是否带回了"适当的"值是无关紧要的.

当前代码:

struct node
{
    node *n_;
#if PROCESSOR_BITS == 64
    inline constexpr node() : n_{ nullptr }                 { }
    inline constexpr node(node* n) : n_{ n }                { }
    inline void tag(const stack_tag_t t)                    { reinterpret_cast<stack_tag_t*>(this)[3] = t; }
    inline stack_tag_t read_tag()                           { return reinterpret_cast<stack_tag_t*>(this)[3]; }
    inline void clear_pointer()                             { tag(0); }
#elif PROCESSOR_BITS == 32
    stack_tag_t t_;
    inline constexpr node() : n_{ nullptr }, t_{ 0 }        { }
    inline constexpr node(node* n) : n_{ n }, t_{ 0 }       { }
    inline void tag(const stack_tag_t t)                    { t_ = t; }
    inline stack_tag_t read_tag()                           { return t_; }
    inline void clear_pointer()                             { }
#endif
    inline void set(node* n, const stack_tag_t t)           { n_ = n; tag(t); }
};

using std::memory_order_relaxed;
class stack
{
public:
    constexpr stack() : head_{}{}
    void push(node* n)
    {
        node next{n}, head{head_.load(memory_order_relaxed)};
        do
        {
            n->n_ = head.n_;
            next.tag(head.read_tag() + 1);
        } while (!head_.compare_exchange_weak(head, next, memory_order_relaxed, memory_order_relaxed));
    }

    bool pop(node*& n)
    {
        node clean, next, head{head_.load(memory_order_relaxed)};
        do
        {
            clean.set(head.n_, 0);

            if (!clean.n_)
                return false;

            next.set(clean.n_->n_, head.read_tag() + 1);
        } while (!head_.compare_exchange_weak(head, next, memory_order_relaxed, memory_order_relaxed));

        n = clean.n_;
        return true;
    }
protected:
    std::atomic<node> head_;
};
Run Code Online (Sandbox Code Playgroud)

与其他人相比,这个问题有什么不同?放松的原子.他们对这个问题产生了很大的影响.

所以你怎么看?有什么我想念的吗?

Cas*_*sey 4

pushnode->_next已损坏,因为失败后您没有更新compareAndSwap。当下一次尝试成功时,您最初存储的节点可能node->setNext已被另一个线程从堆栈顶部弹出compareAndSwap。结果,某个线程认为它已从堆栈中弹出一个节点,但该线程已将其放回堆栈中。它应该是:

void push(Node* node) noexcept
{
    Node* n = _head.next();
    do {
        node->setNext(n);
    } while (!_head.compareAndSwap(n, node));
}
Run Code Online (Sandbox Code Playgroud)

另外,由于nextsetNextuse memory_order_relaxed,不能保证_head_.next()这里返回最近推送的节点。有可能从堆栈顶部泄漏节点。显然也存在同样的问题pop_head.next()可能会返回以前位于堆栈顶部但不再位于堆栈顶部的节点。如果返回值为nullptr,当堆栈实际上不为空时,可能会无法弹出。

pop如果两个线程尝试同时从堆栈中弹出最后一个节点,也可能出现未定义的行为。它们都看到相同的 值_head.next(),一个线程成功完成弹出操作。另一个线程进入 while 循环 - 因为观察到的节点指针不是nullptr- 但compareAndSwap循环很快将其更新为,nullptr因为堆栈现在为空。在循环的下一次迭代中,该 nullptr 被取消引用以获取其_next指针,并且随之而来的是许多欢闹。

pop显然也患有ABA。两个线程可以看到堆栈顶部的同一个节点。假设一个线程到达评估_next指针的位置,然后阻塞。另一个线程成功弹出该节点,推送 5 个新节点,然后在另一个线程唤醒之前再次推送该原始节点。另一个线程compareAndSwap将成功 - 堆栈顶部节点是相同的 - 但将旧_next值存储到_head而不是新值中。另一个线程推送的五个节点全部泄漏。情况也会如此memory_order_seq_cst