ang*_*lsl 5 c++ lock-free thread-sanitizer
我有一个无锁队列的实现,我相信它是正确的(或者至少是无数据竞争的):
#include <atomic>
#include <iostream>
#include <optional>
#include <thread>
struct Job {
int id;
int data;
};
class JobQueue {
using stdmo = std::memory_order;
struct Node {
std::atomic<Node *> next = QUEUE_END;
Job job;
};
static inline Node *const QUEUE_END = nullptr;
static inline Node *const STACK_END = QUEUE_END + 1;
struct GenNodePtr {
Node *node;
std::uintptr_t gen;
};
alignas(64) std::atomic<Node *> jobs_back;
alignas(64) std::atomic<GenNodePtr> jobs_front;
alignas(64) std::atomic<GenNodePtr> stack_top;
public:
JobQueue()
: jobs_back{new Node{}},
jobs_front{GenNodePtr{jobs_back.load(stdmo::relaxed), 1}},
stack_top{GenNodePtr{STACK_END, 1}} {}
~JobQueue() {
Node *cur_queue = jobs_front.load(stdmo::relaxed).node;
while (cur_queue != QUEUE_END) {
Node *next = cur_queue->next;
delete cur_queue;
cur_queue = next;
}
Node *cur_stack = stack_top.load(stdmo::relaxed).node;
while (cur_stack != STACK_END) {
Node *next = cur_stack->next;
delete cur_stack;
cur_stack = next;
}
}
Node *allocate_node() {
GenNodePtr cur_stack = stack_top.load(stdmo::acquire);
while (true) {
if (cur_stack.node == STACK_END) {
return new Node{};
}
Node *cur_stack_next = cur_stack.node->next.load(stdmo::relaxed);
GenNodePtr new_stack{cur_stack_next, cur_stack.gen + 1};
if (stack_top.compare_exchange_weak(cur_stack, new_stack,
stdmo::acq_rel)) {
return cur_stack.node;
}
}
}
void deallocate_node(Node *node) {
GenNodePtr cur_stack = stack_top.load(stdmo::acquire);
while (true) {
node->next.store(cur_stack.node, stdmo::relaxed);
GenNodePtr new_stack{node, cur_stack.gen + 1};
if (stack_top.compare_exchange_weak(cur_stack, new_stack,
stdmo::acq_rel)) {
break;
}
}
}
public:
void enqueue(Job job) {
Node *new_node = allocate_node();
new_node->next.store(QUEUE_END, stdmo::relaxed);
Node *old_dummy = jobs_back.exchange(new_node, stdmo::acq_rel);
old_dummy->job = job;
old_dummy->next.store(new_node, stdmo::release);
}
std::optional<Job> try_dequeue() {
GenNodePtr old_front = jobs_front.load(stdmo::relaxed);
while (true) {
Node *old_front_next = old_front.node->next.load(stdmo::acquire);
if (old_front_next == QUEUE_END) {
return std::nullopt;
}
GenNodePtr new_front{old_front_next, old_front.gen + 1};
if (jobs_front.compare_exchange_weak(old_front, new_front,
stdmo::relaxed)) {
break;
}
}
Job job = old_front.node->job;
deallocate_node(old_front.node);
return job;
}
};
int main() {
JobQueue queue;
std::atomic<int> i = 0;
std::thread consumer{[&queue, &i]() {
// producer enqueues 1
while (i.load(std::memory_order_relaxed) != 1) {}
std::atomic_thread_fence(std::memory_order_acq_rel);
std::cout << queue.try_dequeue().value_or(Job{-1, -1}).data
<< std::endl;
std::atomic_thread_fence(std::memory_order_acq_rel);
i.store(2, std::memory_order_relaxed);
// producer enqueues 2 and 3
}};
std::thread producer{[&queue, &i]() {
queue.enqueue(Job{1, 1});
std::atomic_thread_fence(std::memory_order_acq_rel);
i.store(1, std::memory_order_relaxed);
// consumer consumes here
while (i.load(std::memory_order_relaxed) != 2) {}
std::atomic_thread_fence(std::memory_order_acq_rel);
queue.enqueue(Job{2, 2});
queue.enqueue(Job{3, 3});
}};
producer.join();
consumer.join();
return 0;
}
Run Code Online (Sandbox Code Playgroud)
该队列被实现为单链双端链表。它使用虚拟节点来解耦生产者和消费者,并使用生成计数器和节点回收(使用内部堆栈)来避免 ABA 问题和try_dequeue.
在使用 Clang 13.0.1、Linux x64 编译的 TSan 下运行此程序,我得到以下竞赛:
WARNING: ThreadSanitizer: data race (pid=223081)
Write of size 8 at 0x7b0400000008 by thread T2:
#0 JobQueue::enqueue(Job) .../bug4.cpp:85 (bug4.tsan+0xe3e53)
#1 operator() .../bug4.cpp:142 (bug4.tsan+0xe39ee)
...
Previous read of size 8 at 0x7b0400000008 by thread T1:
#0 JobQueue::try_dequeue() .../bug4.cpp:104 (bug4.tsan+0xe3c07)
#1 operator() .../bug4.cpp:121 (bug4.tsan+0xe381c)
...
Run Code Online (Sandbox Code Playgroud)
在 Godbolt 上运行(注意,由于 Godbolt 运行程序的方式,TSan 不显示行号信息)
这场竞赛是在try_dequeue从consumer线程调用的先前读取之间进行的:
WARNING: ThreadSanitizer: data race (pid=223081)
Write of size 8 at 0x7b0400000008 by thread T2:
#0 JobQueue::enqueue(Job) .../bug4.cpp:85 (bug4.tsan+0xe3e53)
#1 operator() .../bug4.cpp:142 (bug4.tsan+0xe39ee)
...
Previous read of size 8 at 0x7b0400000008 by thread T1:
#0 JobQueue::try_dequeue() .../bug4.cpp:104 (bug4.tsan+0xe3c07)
#1 operator() .../bug4.cpp:121 (bug4.tsan+0xe381c)
...
Run Code Online (Sandbox Code Playgroud)
稍后写入 ,这是线程的enqueue第三次调用:enqueueproducer
Job job = old_front.node->job;
Run Code Online (Sandbox Code Playgroud)
我相信这种竞争是不可能的,因为producer线程应该consumer通过获取-释放比较-交换到stack_topinallocate_node和 来与线程同步deallocate_node。
现在,奇怪的是,制造GenNodePointer alignas(32)消除了竞争。
问题: