C 无锁队列内存管理

p0w*_*0wl 5 c multithreading memory-leaks memory-management lock-free

为了提高我的 C 技能,我实现了一个线程安全且无锁的队列。该算法来自 Maurice Herlihy 和 Nir ​​Shavit 所著的《多处理器编程的艺术》一书的第 10.5 章,顺便说一句,这是一本很棒的书。

到目前为止,一切正常,但我需要帮助解决以下问题:

问题

该行在方法free(first)中被注释掉,lfq_deq()因为如果队列被多个出列器使用,它可能会导致段错误。如果线程 T1 和 T2 正在出列,并且 T1 释放节点而 T2 仍在使用该节点,则 T2 将产生段错误。

释放内存的优雅方法是什么?由于我不重用节点,所以我不应该遇到 ABA 问题,对吗?或者您认为,重用节点并实现 ABA 问题的已知解决方案更容易吗?

lfq.h

标头提供了一个简单的主要测试方法。

#pragma once
#include <stdlib.h>

typedef struct Node {
    void* data;
    struct Node* next;
} lfq_node_t;

typedef struct Queue {
    lfq_node_t* head;
    lfq_node_t* tail;
} lfq_t;

lfq_t* lfq_new();
void lfq_free(lfq_t* q);
void lfq_enq(lfq_t* q, void* data);
void* lfq_deq(lfq_t* q);
Run Code Online (Sandbox Code Playgroud)

lfq.c

#include "lfq.h"
#include <pthread.h>
#include <stdio.h>

#define CAS(a, b, c) __sync_bool_compare_and_swap(a, b, c)

lfq_t* lfq_new() {
    lfq_t* q = malloc(sizeof(*q));
    lfq_node_t* sentinel = malloc(sizeof(*sentinel));
    sentinel->data = sentinel->next = NULL;
    q->head = q->tail = sentinel;

    return q;
}

void lfq_free(lfq_t* q) {
    lfq_node_t *next, *node = q->head;
    while (node != NULL) {
        next = node->next;
        free(node);
        node = next;
    }
    free(q);
}

void lfq_enq(lfq_t* q, void* data) {
    lfq_node_t *node, *last, *next;

    node = malloc(sizeof(*node));
    node->data = data;
    node->next = NULL;

    while (1) {
        last = q->tail;
        next = last->next;
        if (last == q->tail) {
            if (next == NULL) {
                if (CAS(&(last->next), next, node)) {
                    CAS(&(q->tail), last, node);
                    return;
                }
            } else {
                CAS(&(q->tail), last, next);
            }
        }
    }
}

void* lfq_deq(lfq_t* q) {
    lfq_node_t *first, *last, *next;
    while (1) {
        first = q->head;
        last = q->tail;
        next = first->next;

        if (first == q->head) {
            if (first == last) {
                if (next == NULL) return NULL;
                CAS(&(q->tail), last, next);
            } else {
                void* data = first->next->data;
                if (CAS(&(q->head), first, next)) {
                    // free(first);
                    return data;
                }
            }
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

主程序

测试队列的简单 main 方法:

#include "lfq.h"
#include <stdio.h>

int main() {
    int values[] = {1, 2, 3, 4, 5};
    lfq_t* q = lfq_new();
    for (int i = 0; i < 5; ++i) {
        printf("ENQ %i\n", values[i]);
        lfq_enq(q, &values[i]);
    }
    for (int i = 0; i < 5; ++i) printf("DEQ %i\n", *(int*)lfq_deq(q));
    lfq_free(q);
    return 0;
}
Run Code Online (Sandbox Code Playgroud)

p0w*_*0wl 2

正如Peter Cordes在他的评论中指出的那样,我刚刚发现了内存回收问题:

相比之下,内存回收是无锁数据结构设计中最具挑战性的方面之一。无锁算法(也称为非阻塞算法)保证只要某个进程继续采取步骤,最终某个进程就会完成某项操作。对无锁数据结构执行内存回收的主要困难是,进程可能在休眠时持有指向即将释放的对象的指针。因此,不小心释放对象可能会导致睡眠进程在醒来时访问已释放的内存,从而导致程序崩溃或产生细微的错误。由于节点没有被锁定,进程必须协调以让彼此知道哪些节点可以安全回收,哪些节点仍然可以访问。

引自奥地利科学技术研究所 Trevor Brown 的《回收无锁数据结构的内存:必须有更好的方法》

可以在这里找到一个很好的答案(与堆栈相关,但本质上是相同的)。