有趣的是,我发现很多程序员错误地认为"无锁"只意味着"没有互斥的并发编程".通常,还存在一个相关的误解,即编写无锁代码的目的是为了获得更好的并发性能.当然,无锁的正确定义实际上是关于进度保证.无锁算法保证至少一个线程能够前进,无论其他线程正在做什么.
这意味着无锁算法永远不会有一个代码,其中一个线程依赖于另一个线程才能继续.例如,无锁代码不能具有线程A设置标志的情况,然后线程B在等待线程A取消设置标志时保持循环.这样的代码基本上实现了一个锁(或者我称之为伪装的互斥锁).
然而,其他情况更微妙,在某些情况下我真的无法确定算法是否符合无锁定的要求,因为"取得进步"的概念有时对我来说似乎是主观的.
其中一个例子是(备受好评的,afaik)并发库liblfds.我正在研究liblfds中多生产者/多消费者有界队列的实现 - 实现非常简单,但我无法确定它是否应该符合无锁定条件.
相关算法在lfds711_queue_bmm_enqueue.c
.Liblfds使用自定义原子和内存障碍,但算法很简单,我可以用段落左右来描述.
队列本身是一个有界的连续数组(ringbuffer).共享read_index
和write_index
.队列中的每个插槽都包含一个用户数据字段和一个sequence_number
值,它基本上类似于一个纪元计数器.(这避免了ABA问题).
PUSH算法如下:
write_index
write_index % queue_size
使用试图设置write_index
为的CompareAndSwap循环在队列中保留一个插槽write_index + 1
.sequence_index
通过使其等于来更新插槽write_index + 1
.实际的源代码使用自定义原子和内存障碍,因此为了进一步清楚这个算法,我简要地将它翻译成(未经测试的)标准C++原子以获得更好的可读性,如下所示:
bool mcmp_queue::enqueue(void* data)
{
int write_index = m_write_index.load(std::memory_order_relaxed);
for (;;)
{
slot& s = m_slots[write_index % m_num_slots];
int sequence_number = s.sequence_number.load(std::memory_order_acquire);
int difference = sequence_number - write_index;
if (difference == 0)
{
if (m_write_index.compare_exchange_weak(
write_index,
write_index + …
Run Code Online (Sandbox Code Playgroud) 所以,经过一番研究后,我写了一个队列.它使用固定大小的缓冲区,因此它是一个循环队列.它必须是线程安全的,我试图让它无锁.我想知道它有什么问题,因为这些事情我自己很难预测.
这是标题:
template <class T>
class LockFreeQueue
{
public:
LockFreeQueue(uint buffersize) : buffer(NULL), ifront1(0), ifront2(0), iback1(0), iback2(0), size(buffersize) { buffer = new atomic <T>[buffersize]; }
~LockFreeQueue(void) { if (buffer) delete[] buffer; }
bool pop(T* output);
bool push(T input);
private:
uint incr(const uint val)
{return (val + 1) % size;}
atomic <T>* buffer;
atomic <uint> ifront1, ifront2, iback1, iback2;
uint size;
};
Run Code Online (Sandbox Code Playgroud)
这是实施:
template <class T>
bool LockFreeQueue<T>::pop(T* output)
{
while (true)
{
/* Fetch ifront and store it in i. …
Run Code Online (Sandbox Code Playgroud)