Hay*_*ach 5 c++ queue multithreading lock-free c++11
所以,经过一番研究后,我写了一个队列.它使用固定大小的缓冲区,因此它是一个循环队列.它必须是线程安全的,我试图让它无锁.我想知道它有什么问题,因为这些事情我自己很难预测.
这是标题:
template <class T>
class LockFreeQueue
{
public:
LockFreeQueue(uint buffersize) : buffer(NULL), ifront1(0), ifront2(0), iback1(0), iback2(0), size(buffersize) { buffer = new atomic <T>[buffersize]; }
~LockFreeQueue(void) { if (buffer) delete[] buffer; }
bool pop(T* output);
bool push(T input);
private:
uint incr(const uint val)
{return (val + 1) % size;}
atomic <T>* buffer;
atomic <uint> ifront1, ifront2, iback1, iback2;
uint size;
};
Run Code Online (Sandbox Code Playgroud)
这是实施:
template <class T>
bool LockFreeQueue<T>::pop(T* output)
{
while (true)
{
/* Fetch ifront and store it in i. */
uint i = ifront1;
/* If ifront == iback, the queue is empty. */
if (i == iback2)
return false;
/* If i still equals ifront, increment ifront, */
/* Incrememnting ifront1 notifies pop() that it can read the next element. */
if (ifront1.compare_exchange_weak(i, incr(i)))
{
/* then fetch the output. */
*output = buffer[i];
/* Incrememnting ifront2 notifies push() that it's safe to write. */
++ifront2;
return true;
}
/* If i no longer equals ifront, we loop around and try again. */
}
}
template <class T>
bool LockFreeQueue<T>::push(T input)
{
while (true)
{
/* Fetch iback and store it in i. */
uint i = iback1;
/* If ifront == (iback +1), the queue is full. */
if (ifront2 == incr(i))
return false;
/* If i still equals iback, increment iback, */
/* Incrememnting iback1 notifies push() that it can write a new element. */
if (iback1.compare_exchange_weak(i, incr(i)))
{
/* then store the input. */
buffer[i] = input;
/* Incrementing iback2 notifies pop() that it's safe to read. */
++iback2;
return true;
}
/* If i no longer equals iback, we loop around and try again. */
}
}
Run Code Online (Sandbox Code Playgroud)
编辑:我根据评论(感谢KillianDS和nm!)对代码做了一些重大修改.最重要的是,ifront和iback现在是ifront1,ifront2,iback1和iback2.push()现在将递增iback1,通知其他推送线程,它们可以安全地写入下一个元素(只要它不是满的),写入元素,然后递增iback2.iback2是pop()检查的全部内容.pop()执行相同的操作,但使用ifrontn索引.
现在,再一次,我陷入了"这应该工作......"的陷阱,但我对形式证明或类似的东西一无所知.至少这次,我想不出它可能失败的潜在方式.除了"停止尝试编写无锁代码"之外,任何建议都表示赞赏.
接近无锁数据结构的正确方法是编写一个半正式证明,证明您的设计在伪代码中工作.你不应该问"这个锁定免费代码线程是否安全",而是"我证明这个锁定免费代码是线程安全的有什么错误吗?"
只有在您获得伪代码设计有效的正式证据之后,才会尝试实现它.通常这会带来垃圾收集等必须小心处理的问题.
您的代码应该是注释中的形式证明和伪代码,其中散布着相对不重要的实现.
验证代码是否正确则需要了解伪代码,检查证明,然后检查代码是否无法映射到伪代码和证明.
直接获取代码并尝试检查它是否锁定是不切实际的.证明是正确设计这类事物的重要因素,实际的代码是次要的,因为证明是困难的部分.
而之后和当你做了上述所有的,以及有其他人验证它,你必须把通过实际测试你的代码,看看,如果你有一个盲点,并有一个洞,或者不明白你的并发原语,或者如果你的并发原语中有bug.
如果您对编写半正式证明不感兴趣来设计代码,则不应该手动滚动无锁算法和数据结构,并将它们放在生产代码中.
确定一堆代码"是否是线程安全的"将所有工作负担都放在其他人身上.你需要有一个论点,为什么你的代码"是线程安全的"安排,以便其他人尽可能容易地找到它的漏洞.如果你的论点为什么你的代码"是线程安全的"被安排的方式使得更难找到漏洞,那么你的代码就不能被认为是线程安全的,即使没有人能在你的代码中发现漏洞.
你上面发布的代码很乱.它包含注释掉的代码,没有正式的不变量,没有线条的证明,没有强烈描述为什么它是线程安全的,并且一般不提出尝试以一种易于发现的方式显示自己作为线程安全缺陷.因此,没有合理的读者会认为代码线程是安全的,即使他们在其中找不到任何错误.
不,它不是线程安全的 - 考虑以下事件顺序:
if (ifront.compare_exchange_weak(i, incr(i)))
并由pop
调度程序进入睡眠状态。size
时间(足以使 ifront 等于i
第一个线程中的值)。在这种情况下,popbuffer[i]
将包含最后推送的值,这是错误的。