无锁双向链表的原子操作

MRB*_*MRB 5 c++ multithreading atomic lock-free c++11

我正在根据这些论文编写一个无锁双向链表:

\n\n

“基于引用计数的高效可靠的无锁内存回收”\nAnders Gidenstam,IEEE 成员,Marina Papatriantafilou,H\xcb\x9a akan Sundell 和 Philippas Tsigas

\n\n

“无锁双端队列和双向链表”\nH\xc3\xa5kan Sundell,Philippas Tsigas

\n\n

对于这个问题我们可以先搁置第一篇论文。

\n\n

在本文中,他们使用了一种巧妙的方法来在单词中存储删除标志和指针。\n(更多信息请参见此处

\n\n

论文中这部分的伪代码:

\n\n
union Link\n    : word\n    (p,d): {pointer to Node, boolean} \n\nstructure Node\n    value: pointer to word\n    prev: union Link\n    next: union Link\n
Run Code Online (Sandbox Code Playgroud)\n\n

我的上述伪代码的代码:

\n\n
template< typename NodeT >\nstruct LockFreeLink\n{\npublic:\n    typedef NodeT NodeType;\n\nprivate:\n\nprotected:\n    std::atomic< NodeT* > mPointer;\n\npublic:\n    bcLockFreeLink()\n    {\n        std::atomic_init(&mPointer, nullptr);\n    }\n    ~bcLockFreeLink() {}\n\n    inline NodeType* getNode() const throw()\n    {\n        return std::atomic_load(&mPointer, std::memory_order_relaxed);\n    }\n    inline std::atomic< NodeT* >* getAtomicNode() const throw()\n    {\n        return &mPointer;\n    }\n};\n\nstruct Node : public LockFreeNode\n{\n    struct Link : protected LockFreeLink< Node >\n    {\n        static const int dMask = 1;\n        static const int ptrMask = ~dMask;\n\n        Link() { } throw()\n        Link(const Node* pPointer, bcBOOL pDel = bcFALSE) throw()\n        { \n            std::atomic_init(&mPointer, (reinterpret_cast<int>(pPointer) | (int)pDel)); \n        }\n\n        Node* pointer() const throw() \n        { \n            return reinterpret_cast<Node*>(\n                std::atomic_load(&data, std::memory_order_relaxed) & ptrMask); \n        }\n        bool del() const throw() \n        { \n            return std::atomic_load(&data, std::memory_order_relaxed) & dMask; \n        }\n        bool compareAndSwap(const Link& pExpected, const Link& pNew) throw() \n        { \n            Node* lExpected = std::atomic_load(&pExpected.mPointer, std::memory_order_relaxed);\n            Node* lNew = std::atomic_load(&pNew.mPointer, std::memory_order_relaxed);\n\n            return std::atomic_compare_exchange_strong_explicit(\n                &mPointer,\n                &lExpected,\n                lNew,\n                std::memory_order_relaxed,\n                std::memory_order_relaxed); \n        }\n\n        bool operator==(const Link& pOther) throw() \n        { \n            return std::atomic_load(data, std::memory_order_relaxed) == \n                std::atomic_load(pOther.data, std::memory_order_relaxed); \n        }\n        bool operator!=(const Link& pOther) throw() \n        { \n            return !operator==(pOther); \n        }\n    };\n\n    Link mPrev;\n    Link mNext;\n    Type mData;\n\n    Node() {};\n    Node(const Type& pValue) : mData(pValue) {};\n};\n
Run Code Online (Sandbox Code Playgroud)\n\n

本文有一个将链接删除标记设置为true的函数:

\n\n
procedure SetMark(link: pointer to pointer to Node)\n    while true do\n       node = *link;\n       if node.d = true or CAS(link, node, (node.p, true)) then break;\n
Run Code Online (Sandbox Code Playgroud)\n\n

我的这个函数的代码:

\n\n
void _setMark(Link* pLink)\n{\n    while (bcTRUE)\n    {\n        Link lOld = *pLink;\n        if(pLink->del() || pLink->compareAndSwap(lOld, Link(pLink->pointer(), bcTRUE)))\n            break;\n    }\n}\n
Run Code Online (Sandbox Code Playgroud)\n\n

但我的问题是在compareAndSwap函数中我必须比较和交换三个原子变量。有关问题的信息在这里

\n\n

(实际上new比较和交换函数中的变量并不重要,因为它是线程局部的)

\n\n

现在我的问题:我如何编写compareAndSwap函数来比较和交换三个原子变量或者我在哪里犯了错误?

\n\n

(请原谅我问了很长的问题)

\n\n

编辑:

\n\n

类似的问题在内存管理器论文中:

\n\n
function CompareAndSwapRef(link:pointer to pointer toNode,\nold:pointer toNode, new:pointer toNode):boolean\n    if CAS(link,old,new) then\n        if new=NULL then\n            FAA(&new.mmref,1);\n            new.mmtrace:=false;\n    if old=NULLthen FAA(&old.mmref,-1);\n    return true;\nreturn false; \n
Run Code Online (Sandbox Code Playgroud)\n\n

在这里,我必须再次比较和交换三个原子变量。\n(请注意,我的参数是类型Link,我必须比较和mPointer交换Link

\n

Mat*_*son 3

除非您可以使您要比较/交换的三个数据项适合两个指针大小的元素,否则您无法通过比较和交换来做到这一点(当然不能在 x86 上,而且我还没有听说过任何其他机器架构可以做到这一点)有这样的事)。

如果您依赖存储在(至少)与偶数字节地址对齐的地址上的数据,则在删除元素时可能会使用按位或来设置最低位。过去,人们一直使用地址的上部部分来存储额外的数据,但至少在 x86-64 中,这是不可能的,因为地址的上部部分必须是“规范的”,这意味着任何地址位高于“可用限制”(由处理器架构定义,当前为 48 位)的所有位都必须与可用限制的最高位相同(因此与位 47 相同)。

编辑:这部分代码完全符合我的描述:

    static const int dMask = 1;
    static const int ptrMask = ~dMask;

    Link() { } throw()
    Link(const Node* pPointer, bcBOOL pDel = bcFALSE) throw()
    { 
        std::atomic_init(&mPointer, (reinterpret_cast<int>(pPointer) | (int)pDel)); 
    }

    Node* pointer() const throw() 
    { 
        return reinterpret_cast<Node*>(
            std::atomic_load(&data, std::memory_order_relaxed) & ptrMask); 
    }
Run Code Online (Sandbox Code Playgroud)

它使用最低位来存储pDel标志。

您应该能够使用cmpxchg16b(on x86) 的 a 形式对双链表执行此操作。在 Windows 系统中,这将是_InterlockedCompareExchange128. 在 gcc(对于 Unix 类型操作系统,例如 Linux/MacOS)中,您需要首先int128从两个指针构造 a 。如果您正在编译 32 位代码,则可能需要为 Windows 和 Unix 操作系统创建 64 位 int。