我正在设计一个连接到一个或多个数据源流的系统,并对数据进行一些分析,而不是基于结果触发事件.在典型的多线程生产者/消费者设置中,我将有多个生产者线程将数据放入队列,并且多个消费者线程读取数据,并且消费者仅对最新数据点加上n个点感兴趣.如果慢速消费者无法跟上,生产者线程将不得不阻止,当然,当没有未经处理的更新时,消费者线程将被阻止.使用具有读取器/写入器锁的典型并发队列将很好地工作,但是进入的数据速率可能很大,因此我希望减少锁定开销,尤其是生产者的写入锁.我认为我需要一个循环无锁缓冲区.
现在有两个问题:
圆形无锁缓冲是答案吗?
如果是这样,在我自己推出之前,您是否知道任何符合我需求的公共实施?
任何指向实现循环无锁缓冲区的指针总是受欢迎的.
顺便说一句,在Linux上用C++做这件事.
一些额外的信息:
响应时间对我的系统至关重要.理想情况下,消费者线程会希望尽快看到任何更新,因为额外的1毫秒延迟可能会使系统失去价值,或者价值更低.
我倾向于的设计思想是一个半无锁的循环缓冲区,生产者线程尽可能快地将数据放入缓冲区,让我们调用缓冲区A的头部,除非缓冲区已满,否则A满足缓冲区Z的结束.消费者线程将各自保存两个指向循环缓冲区P和P n的指针,其中P是线程的本地缓冲区头,P n是P 之后的第n个项目.每个消费者线程将推进其P和P ñ一旦处理完当前P和缓冲器指针Z的端前进具有最慢P ñ.当P赶上A,这意味着没有更新的处理更新,消费者旋转并忙着等待A再次前进.如果消费者线程旋转太长时间,它可以进入休眠状态并等待条件变量,但我没关系消费者占用CPU周期等待更新,因为这不会增加我的延迟(我会有更多的CPU核心)比线程).想象一下,你有一个循环轨道,并且生产者正在一群消费者面前运行,关键是调整系统,使生产者通常比消费者领先一步,并且大部分操作都可以使用无锁技术完成.我理解获得正确实施的细节并不容易......好吧,非常难,这就是为什么我想在自己制作一些错误之前先从别人的错误中吸取教训.
有趣的是,我发现很多程序员错误地认为"无锁"只意味着"没有互斥的并发编程".通常,还存在一个相关的误解,即编写无锁代码的目的是为了获得更好的并发性能.当然,无锁的正确定义实际上是关于进度保证.无锁算法保证至少一个线程能够前进,无论其他线程正在做什么.
这意味着无锁算法永远不会有一个代码,其中一个线程依赖于另一个线程才能继续.例如,无锁代码不能具有线程A设置标志的情况,然后线程B在等待线程A取消设置标志时保持循环.这样的代码基本上实现了一个锁(或者我称之为伪装的互斥锁).
然而,其他情况更微妙,在某些情况下我真的无法确定算法是否符合无锁定的要求,因为"取得进步"的概念有时对我来说似乎是主观的.
其中一个例子是(备受好评的,afaik)并发库liblfds.我正在研究liblfds中多生产者/多消费者有界队列的实现 - 实现非常简单,但我无法确定它是否应该符合无锁定条件.
相关算法在lfds711_queue_bmm_enqueue.c
.Liblfds使用自定义原子和内存障碍,但算法很简单,我可以用段落左右来描述.
队列本身是一个有界的连续数组(ringbuffer).共享read_index
和write_index
.队列中的每个插槽都包含一个用户数据字段和一个sequence_number
值,它基本上类似于一个纪元计数器.(这避免了ABA问题).
PUSH算法如下:
write_index
write_index % queue_size
使用试图设置write_index
为的CompareAndSwap循环在队列中保留一个插槽write_index + 1
.sequence_index
通过使其等于来更新插槽write_index + 1
.实际的源代码使用自定义原子和内存障碍,因此为了进一步清楚这个算法,我简要地将它翻译成(未经测试的)标准C++原子以获得更好的可读性,如下所示:
bool mcmp_queue::enqueue(void* data)
{
int write_index = m_write_index.load(std::memory_order_relaxed);
for (;;)
{
slot& s = m_slots[write_index % m_num_slots];
int sequence_number = s.sequence_number.load(std::memory_order_acquire);
int difference = sequence_number - write_index;
if (difference == 0)
{
if (m_write_index.compare_exchange_weak(
write_index,
write_index + …
Run Code Online (Sandbox Code Playgroud) 我正在为嵌入式系统(Cortex M0)编写代码,并没有所有奢侈的互斥锁/自旋锁/等等.有没有一种简单的方法将数据添加到共享缓冲区(日志文件),该缓冲区将从Main()循环刷新到磁盘?
如果只有一个生产者(1个中断)和单个消费者(主循环),我可以使用一个简单的缓冲区,生产者增加'head',消费者增加'tail'.这将是非常安全的.但现在我有多个生产者(中断)似乎我被卡住了.
我可以为每个中断提供自己的缓冲区,并将它们组合在Main()中,但这需要大量额外的RAM和复杂性.