锁定免费队列 - 单一生产者,多个消费者

Shi*_*ish 17 c++ queue atomic lock-free

我正在寻找一种方法来实现支持单个生产者和多个消费者的无锁队列数据结构.我看过Maged Michael和Michael Scott(1996)的经典方法,但他们的版本使用链表.我想要一个使用有界循环缓冲区的实现.什么东西使用原子变量?

另外,我不确定为什么这些经典方法是为需要大量动态内存管理的链表设计的.在多线程程序中,所有内存管理例程都是序列化的.我们不是通过将它们与动态数据结构结合使用来破坏无锁方法的好处吗?

我试图在英特尔64位架构上使用pthread库在C/C++中编写代码.

谢谢Shirish

Lan*_*uck 9

使用圆形缓冲器需要锁定,因为需要阻塞以防止头部越过尾部.但是否则头部和尾部指针可以很容易地以原子方式更新.或者在某些情况下,缓冲区可能非常大,以至于覆盖不是问题.(在现实生活中,你会在自动交易系统中看到这一点,循环缓冲区的大小可以容纳X分钟的市场数据.如果你落后X分钟,你会比覆盖你的缓冲区更糟糕的问题).

当我在C++中实现MS队列时,我使用堆栈构建了一个无锁分配器,这很容易实现.如果我有MSQueue,那么在编译时我知道sizeof(MSQueue :: node).然后我制作一堆所需大小的N个缓冲区.N可以增长,即如果pop()返回null,很容易向堆请求更多的块,并将它们推送到堆栈中.在可能阻塞的更多内存调用之外,这是一个无锁操作.

请注意,T不能有非平凡的dtor.我研究过一个允许非平凡dtors的版本,实际上是有效的.但我发现将T指向我想要的T,生产者释放所有权,消费者获得所有权更容易.这当然要求T本身使用lockfree方法分配,但我在堆栈中使用的相同分配器也在这里工作.

在任何情况下,无锁编程的要点都不是数据结构本身更慢.要点是这样的:

  1. lock free使我独立于调度程序.基于锁的编程取决于调度程序,以确保锁的持有者正在运行,以便他们可以释放锁.这就是导致"优先级倒置"的原因在Linux上有一些锁定属性可以确保这种情况发生
  2. 如果我独立于调度程序,操作系统管理时间片的时间要容易得多,而且我的上下文切换要少得多
  3. 使用lockfree方法编写正确的多线程程序更容易,因为我不必担心死锁,活锁,调度,同步等等.这对于共享内存实现尤其如此,其中进程可能会在共享内存中锁定时死亡,并且没有办法释放锁
  4. 无锁方法更容易扩展.实际上,我已经使用网络上的消息传递实现了无锁方法.像这样的分布式锁是一场噩梦

也就是说,在许多情况下,基于锁定的方法是优选的和/或需要的

  1. 更新昂贵或无法复制的东西时.大多数无锁方法使用某种版本控制,即制作对象的副本,更新它,并检查共享版本是否仍然与复制时相同,然后使当前版本更新版本.Els再次复制它,应用更新,然后再次检查.继续这样做直到它工作.当对象很小但是它们很大,或包含文件句柄等时,这很好,不推荐使用
  2. 大多数类型都无法以无锁方式访问,例如任何STL容器.它们具有需要非原子访问的不变量,例如assert(vector.size()== vector.end() - vector.begin()).因此,如果要更新/读取共享的向量,则必须将其锁定.


Mat*_*att 6

这是一个古老的问题,但没有人提供可接受的解决方案。因此,我为可能正在搜索的其他人提供此信息。

本网站:http : //www.1024cores.net

提供了一些非常有用的无锁/无等待数据结构,并带有详尽的解释。

您正在寻求的是针对读写器问题的无锁解决方案。

请参阅:http : //www.1024cores.net/home/lock-free-algorithms/reader-writer-problem


Kil*_*nDS 1

对于传统的单块循环缓冲区,我认为这根本无法通过原子操作安全地完成。你需要一口气读完这么多内容。假设您有一个具有以下结构的结构:

uint8_t* buf;
unsigned int size; // Actual max. buffer size
unsigned int length; // Actual stored data length (suppose in write prohibited from being > size)
unsigned int offset; // Start of current stored data
Run Code Online (Sandbox Code Playgroud)

在阅读时,您需要执行以下操作(这就是我实现它的方式,您可以交换一些步骤,就像我稍后讨论的那样):

  1. 检查读取长度是否超过存储长度
  2. 检查偏移量+读取长度是否超出缓冲区边界
  3. 读出数据
  4. 增加偏移量,减少长度

为了使这项工作正常进行,你应该做什么同步(如此原子)?实际上将步骤 1 和 4 结合在一个原子步骤中,或者澄清一下:同步执行此操作:

  1. 检查read_length,这可以是这样的read_length=min(read_length,length);
  2. 使用 read_length 减少长度:length-=read_length
  3. 从偏移量获取本地副本unsigned int local_offset = offset
  4. 增加 read_length 的偏移量:offset+=read_length

之后,您可以从 local_offset 开始执行 memcpy (或其他任何操作),检查您的读取是否超过循环缓冲区大小(分成 2 个 memcpy),...。这是“相当”线程安全的,您的写入方法仍然可以写入您正在读取的内存,因此请确保您的缓冲区确实足够大,以最大限度地减少这种可能性。

现在,虽然我可以想象你可以在原子操作中组合 3 和 4(我猜这就是它们在链表情况下所做的事情),甚至是 1 和 2,但我看不到你在一个原子操作中完成整个交易:)。

然而,您可以尝试放弃“长度”,检查您的消费者是否非常聪明并且总是知道该读什么。那么您还需要一个新的 woffset 变量,因为 (offset+length)%size 来确定写入偏移量的旧方法不再起作用。请注意,这与链接列表的情况很接近,您实际上总是从列表中读取一个元素(=固定、已知大小)。同样在这里,如果你把它做成一个循环链表,你就可以读太多或写到你当时正在读的位置!

最后:我的建议,只要使用锁,我使用 CircularBuffer 类(读和写完全安全)作为实时 720p60 视频流,并且我完全没有因锁定而出现速度问题。