无锁简单隔离存储算法

Sil*_*ler 5 c++ atomic lock-free c++11

我正在使用C++ 编写一个" Simple Segregated Storage "内存池的无锁版本.

SSS内存池类似于slab分配器:它基本上只是一块内存,被分成大小相等的块,我们有一个指向第一个可用块的空闲列表指针.分配是简单地移动鼠标指针到下一个块,和释放简直是自由表指针设置解除了分配的块,并指出解除了分配的块空闲表指针的旧值"下一步"指针.

所以它基本上是一个单链表.

现在,我正在尝试编写Simple Segregated Storage算法的无锁版本.如果我们假设的数量性状分离的内存初始块(即创建链接的列表)一直在做之前进入一个多线程环境中,我们只需要担心的分配和释放块-在这种情况下,这个问题变得非常类似于锁 - 免费的单链表,这是一个很好理解的问题.

因此,在我看来,通过使用简单的比较和交换指令,可以轻松地以无锁方式完成分配和解除分配.

假设我们有以下空闲列表指针:

std::atomic<unsigned char*> m_first
Run Code Online (Sandbox Code Playgroud)

并假设我们有一个辅助函数,nextof()它接受一个块指针并返回对"下一个指针"的引用.下一个指针实际上嵌入在内存块中 - 所以nextof函数看起来像这样:

unsigned char*& nextof(void* ptr)
{
    return *static_cast<unsigned char**>(ptr);
}
Run Code Online (Sandbox Code Playgroud)

这至少是Boost库如何实现简单隔离存储.

无论如何,我对无锁原子allocate函数的尝试是:

void* malloc()
{
    unsigned char* first = m_first.load();
    if (!first) return nullptr;

    while (!m_first.compare_exchange_strong(first, nextof(first))) 
    {
        if (!first) break;
    }

    return first;
}
Run Code Online (Sandbox Code Playgroud)

所以 - 这看起来非常简单.我们需要做的就是原子地设置m_first为嵌入在指向的块中的下一个指针m_first.最初,我认为这就像说的那么简单m_first.store(nextof(m_first))- 但不幸的是我认为这不是原子的,因为我们实际上是在同一个内存位置上存储和加载 - 我们正在存储一个新值m_first,但也加载值m_first以获得它的下一个指针.

所以,似乎我们需要进行比较和交换.在上面的代码中,我原子地将值加载m_first到局部变量中.然后,在检查null之后,我原子地改变m_firstCAS 的值,当且仅当它仍然等于局部变量的值时.如果没有,则局部变量更新m_first为现在的任何内容,并且我们继续循环直到我们没有被其他线程抢占.

该释放是有点麻烦-在这里,我们需要改变的值,m_first解除了分配的块,并且更新下一个指针重新分配的块指向什么m_first曾经是.

我的尝试是:

void free(void* chunk)
{
    unsigned char* first = m_first.load();
    nextof(chunk) = first;

    while (!m_first.compare_exchange_strong(first, static_cast<unsigned char*>(chunk)))
    {
        nextof(chunk) = first;
    }
}
Run Code Online (Sandbox Code Playgroud)

我不太确定我有这个权利,但我认为这是正确的.首先,我原子地加载值m_first并将其存储在局部变量中.然后,我将下一个指针设置在我的本地副本的块上m_first.当然,到现在为止,另一个线程可以干预并设置m_first为其他东西 - 所以我现在必须做一个CAS操作来检查它m_first是否仍然是我所期望的.如果不是,我必须将下一个指针设置为适当的值,然后继续循环.

据我所知,这是所有竞赛条件安全的,只有一个CAS操作mallocfree.

问题:由于无锁算法很难,我不确定我是否正确.我是否忽视了可能导致竞争状况的事情?

Gro*_*roo 0

一个旧答案,但我想我可以发表意见。除非我弄错了,否则我相信代码可能会通过ABA 问题的某个版本泄漏数据。

问题是在将参数传递给内部nextof(first)原子之前对其进行评估。如果我们从函数中取出循环:compare_exchange_strongmallocwhilemalloc

while (!m_first.compare_exchange_strong(first, nextof(first)))
{
    if (!first) return nullptr;
}
Run Code Online (Sandbox Code Playgroud)

并重写它以nextof(first)在传递给 之前显式求值compare_exchange_strong,以使其更加明显:

while (true)
{
    auto tmp_next = nextof(first);

    // this is where the chance for ABA exists

    if (m_first.compare_exchange_strong(first, tmp_next))
        break;

    if (!first) return nullptr;
}
Run Code Online (Sandbox Code Playgroud)

然后似乎有可能m_first在这两行之间更改为不同的值并返回,同时另一个线程实际发生变化nextof(m_first)

// I will represent the linked list using letters,
// i.e. m_first -> "A" means that "A" is at the head,
// and "A" -> "B" means that "A" points to "B"

void* malloc()
{
    // at the start, m_first points to "A": m_first -> "A" -> "B" -> "C"
    unsigned char* first = m_first.load();

    // --> thread #2 calls malloc() here and acquires "A", and
    //     removes it, so: m_first -> "B" -> "C"

    if (!first) return nullptr;

    while (true)
    {
        unsigned char * tmp_next = nextof(first);
        // tmp_next is "B" at this moment, because 'first' is still "A"            

        // --> some third thread returns some even older object now,
        //     so m_first -> "M" -> "B" -> "C"

        // --> thread #2 now calls free() to return "A", resulting in
        //     my_first -> "A" -> "M" -> "B" -> "C"

        // m_first is now back to "A", but tmp_next points
        // to old "B" instead of "M", and the following line succeeds
        // and "M" is lost forever: m_first -> "B" -> "C"

        if (m_first.compare_exchange_strong(first, tmp_next))
            break;

        if (!first) return nullptr;
    }

    return first;
}
Run Code Online (Sandbox Code Playgroud)

关于 ABA 的维基百科文章包含一些解决这些问题的建议,例如“标记状态引用”(基本上向这些指针添加“事务计数器”之类的内容,以便您可以检测到第二个“A”与第一个“A”不同) ),或者我发现实现起来稍微复杂一些的“中间节点”(显然是由John D. Valois 在本文中发明的)。