圆形无锁缓冲区

Shi*_*Yip 69 c++ algorithm concurrency multithreading lock-free

我正在设计一个连接到一个或多个数据源流的系统,并对数据进行一些分析,而不是基于结果触发事件.在典型的多线程生产者/消费者设置中,我将有多个生产者线程将数据放入队列,并且多个消费者线程读取数据,并且消费者仅对最新数据点加上n个点感兴趣.如果慢速消费者无法跟上,生产者线程将不得不阻止,当然,当没有未经处理的更新时,消费者线程将被阻止.使用具有读取器/写入器锁的典型并发队列将很好地工作,但是进入的数据速率可能很大,因此我希望减少锁定开销,尤其是生产者的写入锁.我认为我需要一个循环无锁缓冲区.

现在有两个问题:

  1. 圆形无锁缓冲是答案吗?

  2. 如果是这样,在我自己推出之前,您是否知道任何符合我需求的公共实施?

任何指向实现循环无锁缓冲区的指针总是受欢迎的.

顺便说一句,在Linux上用C++做这件事.

一些额外的信息:

响应时间对我的系统至关重要.理想情况下,消费者线程会希望尽快看到任何更新,因为额外的1毫秒延迟可能会使系统失去价值,或者价值更低.

我倾向于的设计思想是一个半无锁的循环缓冲区,生产者线程尽可能快地将数据放入缓冲区,让我们调用缓冲区A的头部,除非缓冲区已满,否则A满足缓冲区Z的结束.消费者线程将各自保存两个指向循环缓冲区P和P n的指针,其中P是线程的本地缓冲区头,P n是P 之后的第n个项目.每个消费者线程将推进其P和P ñ一旦处理完当前P和缓冲器指针Z的端前进具有最慢P ñ.当P赶上A,这意味着没有更新的处理更新,消费者旋转并忙着等待A再次前进.如果消费者线程旋转太长时间,它可以进入休眠状态并等待条件变量,但我没关系消费者占用CPU周期等待更新,因为这不会增加我的延迟(我会有更多的CPU核心)比线程).想象一下,你有一个循环轨道,并且生产者正在一群消费者面前运行,关键是调整系统,使生产者通常比消费者领先一步,并且大部分操作都可以使用无锁技术完成.我理解获得正确实施的细节并不容易......好吧,非常难,这就是为什么我想在自己制作一些错误之前先从别人的错误中吸取教训.

小智 37

在过去的几年里,我对无锁数据结构做了一个特别的研究.我已经阅读了该领域的大部分论文(只有大约四十个左右 - 虽然只有大约十或十五个是真的有用:-)

AFAIK,一种无锁循环缓冲器尚未发明.问题将是处理读者超越作家或反之亦然的复杂情况.

如果您没有花费至少六个月的时间研究无锁数据结构,请不要尝试自己编写一个.您将错误地看到错误存在可能并不明显,直到您的代码在部署后在新平台上失败.

不过我相信你的要求有一个解决方案.

您应该将无锁队列与无锁空闲列表配对.

自由列表将为您提供预分配,从而避免了对无锁分配器的(财政上昂贵的)要求; 当free-list为空时,通过立即从队列中取出一个元素并使用它来复制循环缓冲区的行为.

(当然,在基于锁的循环缓冲区中,一旦获得锁定,获取元素非常快 - 基本上只是一个指针取消引用 - 但你不会在任何无锁算法中得到它;它们通常必须去他们的工作方式很糟糕;自由列表弹出失败以及出列失败的开销与任何无锁算法需要做的工作量相当.

迈克尔和斯科特在1996年开发了一个非常好的无锁队列.下面的链接将为您提供足够的详细信息来追踪他们论文的PDF; 迈克尔和斯科特,FIFO

一个无锁的免费列表是最简单的无锁算法,实际上我认为我没有看到它的实际论文.

  • 我可能错了,但我认为除非volatile执行内存屏障,否则它不会对编译器或CPU的重新排序行为产生任何影响.它所要做的就是确保从内存而不是寄存器或缓存中读取其值. (3认同)
  • IIRC,`volatile` 内存访问不会与其他 `volatile` 访问重新排序。但在 ISO C 中,这就是你所得到的。在 MSVC 中,`volatile` 远不止于此,但现在你应该只使用 `std::atomic` 和 `memory_order_release` 或 `seq_cst` 或任何你想要的。 (2认同)

Nor*_*sey 33

您想要的艺术术语是无锁队列.Ross Bencina 提供一系列优秀的笔记,其中包含代码和论文的链接.我最信任的人是Maurice Herlihy(对于美国人来说,他的名字叫"莫里斯").

  • @Blank Xavier:不,但循环缓冲区是一个队列.该问题需要一个队列.并且队列的最有效实现是......(等待它)循环缓冲区.在任何情况下,如果要搜索,则搜索"无锁队列",而不是"无锁循环缓冲区". (25认同)
  • @Tomas:无锁队列可以更好,因为没有单一锁作为性能瓶颈.OP特别关注在争用非常高时减少锁定开销.信号量无助于争用.一个无锁队列. (6认同)
  • 队列不是循环缓冲区. (2认同)

Dou*_*oug 11

如果缓冲区为空或满,则生产者或消费者阻止的要求表明您应该使用正常的锁定数据结构,使用信号量或条件变量来阻止生成者和使用者阻塞,直到数据可用.无锁代码通常不会阻止这种情况 - 它会旋转或放弃无法执行的操作,而不是使用操作系统阻塞.(如果你能够等到另一个线程产生或消耗数据,那么为什么等待锁定另一个线程来完成更新数据结构呢?)

在(x86/x64)Linux上,如果没有争用,使用互斥锁的线程内同步相当便宜.集中精力减少生产者和消费者需要抓住锁的时间.鉴于你已经说过你只关心最后N个记录的数据点,我认为循环缓冲区可以做得相当好.但是,我真的不明白这是如何符合阻止要求以及消费者实际消费(删除)他们阅读的数据的想法.(您是否希望消费者只查看最后N个数据点,而不是删除它们?您是否希望生产者不关心消费者是否无法跟上,只是覆盖旧数据?)

此外,正如Zan Lynx评论的那样,当你有大量的数据进入时,你可以将你的数据聚合/缓冲到更大的块中.你可以缓冲固定数量的点,或者在一定时间内收到的所有数据.这意味着将减少同步操作.但它确实引入了延迟,但是如果你不使用实时Linux,那么无论如何你都必须在某种程度上处理它.


Hen*_*man 5

在DDJ上有很多关于此的文章.作为这些东西有多么困难的标志,这是对之前文章的错误纠正.确保在推出自己的错误之前了解错误) - ;


Nik*_*sov 5

减少争用的一种有用技术是将项目散列到多个队列中,并让每个消费者专用于一个“主题”。

对于您的消费者感兴趣的最新数量的项目 - 您不想锁定整个队列并迭代它以查找要覆盖的项目 - 只需发布 N 元组中的项目,即所有 N 个最近的项目。实现的奖励点是,生产者会在超时的情况下阻塞整个队列(当消费者无法跟上时),更新其本地元组缓存 - 这样就不会对数据源施加背压。


ram*_*oti 5

萨特的队列不是最优的,他知道这一点。《多核编程的艺术》是一本很好的参考书,但不要相信 Java 人员的内存模型。罗斯的链接不会给你明确的答案,因为他们的图书馆有这样的问题等等。

进行无锁编程就是自找麻烦,除非你想在解决问题之前花费大量时间在明显过度设计的东西上(从它的描述来看,这是一种“追求完美”的常见疯狂行为) ' 在缓存一致性中)。需要花费数年时间,导致不先解决问题,然后再优化,这是一种常见病。

  • Dennis 的站点已移至 http://landenlabs.com/code/ring/ring.html - 它具有无锁环形缓冲区。 (2认同)

小智 5

我不是硬件内存模型和无锁数据结构的专家,我倾向于避免在我的项目中使用它们,我使用传统的锁定数据结构。

但是我最近注意到视频: 基于环形缓冲区的无锁 SPSC 队列

这是基于交易系统使用的名为 LMAX distruptor 的开源高性能 Java 库:LMAX Distruptor

根据上面的介绍,您可以将头指针和尾指针原子化,并原子地检查头部从后面捕捉尾部的情况,反之亦然。

下面你可以看到一个非常基本的 C++11 实现:

// USING SEQUENTIAL MEMORY
#include<thread>
#include<atomic>
#include <cinttypes>
using namespace std;

#define RING_BUFFER_SIZE 1024  // power of 2 for efficient %
class lockless_ring_buffer_spsc
{
    public :

        lockless_ring_buffer_spsc()
        {
            write.store(0);
            read.store(0);
        }

        bool try_push(int64_t val)
        {
            const auto current_tail = write.load();
            const auto next_tail = increment(current_tail);
            if (next_tail != read.load())
            {
                buffer[current_tail] = val;
                write.store(next_tail);
                return true;
            }

            return false;  
        }

        void push(int64_t val)
        {
            while( ! try_push(val) );
            // TODO: exponential backoff / sleep
        }

        bool try_pop(int64_t* pval)
        {
            auto currentHead = read.load();

            if (currentHead == write.load())
            {
                return false;
            }

            *pval = buffer[currentHead];
            read.store(increment(currentHead));

            return true;
        }

        int64_t pop()
        {
            int64_t ret;
            while( ! try_pop(&ret) );
            // TODO: exponential backoff / sleep
            return ret;
        }

    private :
        std::atomic<int64_t> write;
        std::atomic<int64_t> read;
        static const int64_t size = RING_BUFFER_SIZE;
        int64_t buffer[RING_BUFFER_SIZE];

        int64_t increment(int n)
        {
            return (n + 1) % size;
        }
};

int main (int argc, char** argv)
{
    lockless_ring_buffer_spsc queue;

    std::thread write_thread( [&] () {
             for(int i = 0; i<1000000; i++)
             {
                    queue.push(i);
             }
         }  // End of lambda expression
                                                );
    std::thread read_thread( [&] () {
             for(int i = 0; i<1000000; i++)
             {
                    queue.pop();
             }
         }  // End of lambda expression
                                                );
    write_thread.join();
    read_thread.join();

     return 0;
}
Run Code Online (Sandbox Code Playgroud)


Ale*_*lex 5

boost库中的实现值得考虑.它易于使用且性能相当高.我写了一个测试并在四核i7笔记本电脑上运行它(8个线程)并且每秒获得约4M的排队/出队操作.到目前为止未提及的另一个实现是位于http://moodycamel.com/blog/2014/detailed-design-of-a-lock-free-queue的MPMC队列.我已经在32台生产商和32位消费者的同一台笔记本电脑上进行了一些简单的测试.正如所宣传的那样,它比增强无锁队列更快.

由于大多数其他答案状态无锁编程很难.大多数实现都很难检测出需要进行大量测试和调试才能修复的极端情况.通常在代码中小心放置内存屏障来修复这些问题.您还可以在许多学术文章中找到正确性证明.我更喜欢使用强力工具测试这些实现.应使用http://research.microsoft.com/en-us/um/people/lamport/tla/tla.html等工具检查您计划在生产中使用的任何无锁算法的正确性.