val*_*umr 1 c stack lockless rcu aba
我提出了一个想法,我试图实现一个无锁堆栈,不依赖于引用计数来解决ABA问题,并且还正确处理内存回收.它在概念上与RCU类似,并且依赖于两个特征:将列表条目标记为已删除,以及跟踪阅读器遍历列表.前者很简单,它只使用指针的LSB.后者是我对实现无限制无锁堆栈的方法的"聪明"尝试.
基本上,当任何线程试图遍历列表时,一个原子计数器(list.entries)会递增.遍历完成后,第二个计数器(list.exits)递增.
节点分配由push处理,释放由pop处理.
推送和弹出操作与天真无锁堆栈实现非常相似,但必须遍历标记为删除的节点才能到达未标记的条目.因此推送基本上就像链表插入一样.
pop操作类似地遍历列表,但它使用atomic_fetch_or将节点标记为在遍历时被删除,直到它到达未标记的节点.
遍历0个或更多标记节点的列表后,弹出的线程将尝试CAS堆栈的头部.至少有一个并发弹出的线程将成功,在此之后,进入堆栈的所有读者将不再看到以前标记的节点.
成功更新列表的线程然后加载原子list.entries,并基本上自旋加载atomic.exits,直到该计数器最终超过list.entries.这应该意味着列表的"旧"版本的所有读者都已完成.然后,该线程只是释放它从列表顶部交换的标记节点列表.
因此,弹出操作的含义应该(我认为)可能没有ABA问题,因为释放的节点不会返回到可用的指针池,直到所有使用它们的并发读取器完成,显然内存回收问题由于同样的原因,处理也是如此.
所以,无论如何,这是理论,但我仍然在实现上摸不着头脑,因为它目前无法正常工作(在多线程情况下).似乎我在免费问题之后得到了一些写作,但是我在查找问题时遇到了麻烦,或者我的假设可能存在缺陷而且无法正常工作.
无论是概念还是调试代码的方法,都会非常感谢任何见解.
这是我当前(损坏的)代码(使用gcc -D_GNU_SOURCE -std = c11 -Wall -O0 -g -pthread -o list list.c编译):
#include <pthread.h>
#include <stdatomic.h>
#include <stdbool.h>
#include <stdint.h>
#include <stdlib.h>
#include <sys/resource.h>
#include <stdio.h>
#include <unistd.h>
#define NUM_THREADS 8
#define NUM_OPS (1024 * 1024)
typedef uint64_t list_data_t;
typedef struct list_node_t {
struct list_node_t * _Atomic next;
list_data_t data;
} list_node_t;
typedef struct {
list_node_t * _Atomic head;
int64_t _Atomic size;
uint64_t _Atomic entries;
uint64_t _Atomic exits;
} list_t;
enum {
NODE_IDLE = (0x0),
NODE_REMOVED = (0x1 << 0),
NODE_FREED = (0x1 << 1),
NODE_FLAGS = (0x3),
};
static __thread struct {
uint64_t add_count;
uint64_t remove_count;
uint64_t added;
uint64_t removed;
uint64_t mallocd;
uint64_t freed;
} stats;
#define NODE_IS_SET(p, f) (((uintptr_t)p & f) == f)
#define NODE_SET_FLAG(p, f) ((void *)((uintptr_t)p | f))
#define NODE_CLR_FLAG(p, f) ((void *)((uintptr_t)p & ~f))
#define NODE_POINTER(p) ((void *)((uintptr_t)p & ~NODE_FLAGS))
list_node_t * list_node_new(list_data_t data)
{
list_node_t * new = malloc(sizeof(*new));
new->data = data;
stats.mallocd++;
return new;
}
void list_node_free(list_node_t * node)
{
free(node);
stats.freed++;
}
static void list_add(list_t * list, list_data_t data)
{
atomic_fetch_add_explicit(&list->entries, 1, memory_order_seq_cst);
list_node_t * new = list_node_new(data);
list_node_t * _Atomic * next = &list->head;
list_node_t * current = atomic_load_explicit(next, memory_order_seq_cst);
do
{
stats.add_count++;
while ((NODE_POINTER(current) != NULL) &&
NODE_IS_SET(current, NODE_REMOVED))
{
stats.add_count++;
current = NODE_POINTER(current);
next = ¤t->next;
current = atomic_load_explicit(next, memory_order_seq_cst);
}
atomic_store_explicit(&new->next, current, memory_order_seq_cst);
}
while(!atomic_compare_exchange_weak_explicit(
next, ¤t, new,
memory_order_seq_cst, memory_order_seq_cst));
atomic_fetch_add_explicit(&list->exits, 1, memory_order_seq_cst);
atomic_fetch_add_explicit(&list->size, 1, memory_order_seq_cst);
stats.added++;
}
static bool list_remove(list_t * list, list_data_t * pData)
{
uint64_t entries = atomic_fetch_add_explicit(
&list->entries, 1, memory_order_seq_cst);
list_node_t * start = atomic_fetch_or_explicit(
&list->head, NODE_REMOVED, memory_order_seq_cst);
list_node_t * current = start;
stats.remove_count++;
while ((NODE_POINTER(current) != NULL) &&
NODE_IS_SET(current, NODE_REMOVED))
{
stats.remove_count++;
current = NODE_POINTER(current);
current = atomic_fetch_or_explicit(¤t->next,
NODE_REMOVED, memory_order_seq_cst);
}
uint64_t exits = atomic_fetch_add_explicit(
&list->exits, 1, memory_order_seq_cst) + 1;
bool result = false;
current = NODE_POINTER(current);
if (current != NULL)
{
result = true;
*pData = current->data;
current = atomic_load_explicit(
¤t->next, memory_order_seq_cst);
atomic_fetch_add_explicit(&list->size,
-1, memory_order_seq_cst);
stats.removed++;
}
start = NODE_SET_FLAG(start, NODE_REMOVED);
if (atomic_compare_exchange_strong_explicit(
&list->head, &start, current,
memory_order_seq_cst, memory_order_seq_cst))
{
entries = atomic_load_explicit(&list->entries, memory_order_seq_cst);
while ((int64_t)(entries - exits) > 0)
{
pthread_yield();
exits = atomic_load_explicit(&list->exits, memory_order_seq_cst);
}
list_node_t * end = NODE_POINTER(current);
list_node_t * current = NODE_POINTER(start);
while (current != end)
{
list_node_t * tmp = current;
current = atomic_load_explicit(¤t->next, memory_order_seq_cst);
list_node_free(tmp);
current = NODE_POINTER(current);
}
}
return result;
}
static list_t list;
pthread_mutex_t ioLock = PTHREAD_MUTEX_INITIALIZER;
void * thread_entry(void * arg)
{
sleep(2);
int id = *(int *)arg;
for (int i = 0; i < NUM_OPS; i++)
{
bool insert = random() % 2;
if (insert)
{
list_add(&list, i);
}
else
{
list_data_t data;
list_remove(&list, &data);
}
}
struct rusage u;
getrusage(RUSAGE_THREAD, &u);
pthread_mutex_lock(&ioLock);
printf("Thread %d stats:\n", id);
printf("\tadded = %lu\n", stats.added);
printf("\tremoved = %lu\n", stats.removed);
printf("\ttotal added = %ld\n", (int64_t)(stats.added - stats.removed));
printf("\tadded count = %lu\n", stats.add_count);
printf("\tremoved count = %lu\n", stats.remove_count);
printf("\tadd average = %f\n", (float)stats.add_count / stats.added);
printf("\tremove average = %f\n", (float)stats.remove_count / stats.removed);
printf("\tmallocd = %lu\n", stats.mallocd);
printf("\tfreed = %lu\n", stats.freed);
printf("\ttotal mallocd = %ld\n", (int64_t)(stats.mallocd - stats.freed));
printf("\tutime = %f\n", u.ru_utime.tv_sec
+ u.ru_utime.tv_usec / 1000000.0f);
printf("\tstime = %f\n", u.ru_stime.tv_sec
+ u.ru_stime.tv_usec / 1000000.0f);
pthread_mutex_unlock(&ioLock);
return NULL;
}
int main(int argc, char ** argv)
{
struct {
pthread_t thread;
int id;
}
threads[NUM_THREADS];
for (int i = 0; i < NUM_THREADS; i++)
{
threads[i].id = i;
pthread_create(&threads[i].thread, NULL, thread_entry, &threads[i].id);
}
for (int i = 0; i < NUM_THREADS; i++)
{
pthread_join(threads[i].thread, NULL);
}
printf("Size = %ld\n", atomic_load(&list.size));
uint32_t count = 0;
list_data_t data;
while(list_remove(&list, &data))
{
count++;
}
printf("Removed %u\n", count);
}
Run Code Online (Sandbox Code Playgroud)
您提到您正在尝试解决ABA问题,但描述和代码实际上是尝试解决更难的问题:内存回收问题.
此问题通常出现在无垃圾收集的语言中实现的无锁集合的"删除"功能中.核心问题是从共享结构中删除节点的线程通常不知道何时释放已删除的节点是安全的,因为其他读取可能仍然具有对它的引用.作为一个副作用,经常解决这个问题也解决了ABA问题:即使底层指针(和对象的状态)在此期间至少被改变了两次,这也特别是关于CAS操作的成功,最终得到了原始价值,但呈现完全不同的状态.
ABA问题更容易,因为ABA问题有几种直接的解决方案,特别是不会导致"内存回收"问题的解决方案.在某种意义上,能够检测到位置修改的硬件(例如,使用LL/SC或事务存储器基元)可能根本不会出现问题.
所以说,你正在寻找内存回收问题的解决方案,它也将避免ABA问题.
您的问题的核心是此声明:
成功更新列表的线程然后加载原子list.entries,并基本上自旋加载atomic.exits,直到该计数器最终超过list.entries.这应该意味着列表的"旧"版本的所有读者都已完成.然后,该线程只是释放它从列表顶部交换的标记节点列表.
这个逻辑不成立.等待list.exits
(你说atomic.exits,但我认为这是因为你只有大约谈了错字list.exits
其他地方)要大于list.entries
只告诉你有,现在已经更多的总出口比有项目在,突变线捕获的条目计数点.然而,这些出口可能是由新的读者来来往往产生的:它并不意味着所有的老读者都按照你的要求完成了!
这是一个简单的例子.首先,写入线程T1
和读取线程同时T2
访问列表,因此list.entries
是2并且list.exits
为0.写入线程弹出一个节点,并保存当前值(2)list.entries
并等待lists.exits
大于2.现在三多读线程T3
,T4
,T5
到达并执行列表中的快速阅读和离开.现在lists.exits
是3,满足您的条件并T1
释放节点.T2
因为它正在读取一个释放的节点,所以没有去过任何地方并且爆炸!
你有的基本想法可以奏效,但你的两个反制方法尤其肯定是行不通的.
这是一个经过充分研究的问题,因此您不必创建自己的算法(请参阅上面的链接),甚至编写自己的代码,因为librcu和concurrencykit之类的东西已经存在.
如果你想让这项工作用于教育目的,一种方法是使用确保在修改开始后进入的线程使用一组不同的list.entry/exit
计数器.一种方法是生成计数器,当编写者想要修改列表时,它会增加生成计数器,这会导致新读者切换到另一组list.entry/exit
计数器.
现在作家只需要等待list.entry[old] == list.exists[old]
,这意味着所有的老读者都离开了.你也可以通过每代一个计数器逃脱:你真的没有两个entry/exit
计数器(虽然它可能有助于减少争用).
当然,你知道每一代管理这个单独计数器列表的新问题......哪种看起来像建立一个无锁列表的原始问题!这个问题有点容易,因为你可能会对"飞行中"的代数进行一些合理的限制,并且只是将它们全部预先分配,或者你可以实现一种更容易推理的有限类型的无锁列表.因为添加和删除仅发生在头部或尾部.