C++低延迟线程异步缓冲流(用于记录) - Boost

486*_*-66 5 c++ multithreading boost asynchronous low-latency

问题:

3下面的循环包含已注释掉的代码.我搜索("TAG1","TAG2"和"TAG3")以便于识别.我只是希望while循环在测试的条件下等待,然后继续进行,同时尽可能地减少CPU资源.我首先尝试使用Boost条件变量,但是存在竞争条件.让线程休眠'x'微秒是低效的,因为没有办法精确计时唤醒.最后,boost :: this_thread :: yield()似乎没有做任何事情.可能是因为我在双核系统上只有2个活动线程.具体来说,如何在引入尽可能少的不必要阻塞的同时,使下面的三个标记区域更有效地运行.

背景

目的:

我有一个记录大量数据的应用程序.在分析之后,我发现在日志记录操作上花费了很多时间(将文本或二进制文件记录到本地硬盘上的文件中).我的目标是通过使用线程缓冲流记录器的调用替换非线程直接写入调用来减少logData调用的延迟.

选项探索:

  • 将2005年代的慢速硬盘升级到SSD ......可能.成本并不高昂......但涉及大量工作......超过200台计算机必须升级......
  • 提升ASIO ......我不需要所有的proactor /网络开销,寻找更简单,更轻量级的东西.

设计:

  • 生产者和消费者线程模式,应用程序将数据写入缓冲区和后台线程,然后稍后将其写入磁盘.因此,最终目标是让应用程序层调用的writeMessage函数尽可能快地返回,而数据在以后的某个时间以FIFO顺序正确/完全记录到日志文件中.
  • 只有一个应用程序线程,只有一个编写器线程
  • 基于环形缓冲区.这个决定的原因是使用尽可能少的锁,理想情况......如果我错了,请纠正我......我认为我不需要任何锁.
  • Buffer是一个静态分配的字符数组,但如果出于性能原因需要/期望,可以将其移动到堆中.
  • Buffer有一个开始指针,指向应写入文件的下一个字符.缓冲区有一个结束指针,指向要写入文件的最后一个字符后的数组索引.结束指针NEVER传递开始指针.如果出现大于缓冲区的消息,则编写器等待直到缓冲区清空并直接将新消息写入文件,而不将超大消息放入缓冲区(一旦缓冲区清空,工作线程)不会写任何东西,所以没有争论).
  • writer(工作线程)仅更新环形缓冲区的开始指针.
  • 主(应用程序线程)仅更新环形缓冲区的结束指针,并且当有可用空间时,它仅将新数据插入缓冲区...否则它将等待缓冲区中的空间变为可用或直接按描述写入以上.
  • 工作线程不断检查是否有要写入的数据(由缓冲区启动指针!=缓冲区结束指针时的情况表示).如果没有要写入的数据,一旦应用程序线程将某些东西插入缓冲区,工作线程理想情况下应进入休眠状态并唤醒(并更改缓冲区的结束指针,使其不再指向与开始时相同的索引)指针).我在下面的内容涉及while循环不断检查该条件.这是一种非常糟糕/低效的缓冲区等待方式.

结果:

  • 在我的2009年配备SSD的双核笔记本电脑上,我看到线程/缓冲基准测试与直接写入的总写入时间大约是1:6(0.609秒对0.095秒),但变化很大.通常,缓冲写入基准测试实际上比直接写入慢.我认为可变性是由于等待空间在缓冲区中释放,等待缓冲区清空以及让工作线程等待工作变得可用而实现的可能性差.我已经测量过一些while循环消耗超过10000个周期,我怀疑这些循环实际上是在争夺另一个线程(工作者或应用程序)完成等待计算所需的硬件资源.
  • 输出似乎结账.启用TEST模式并将10的小缓冲区作为压力测试,我将数百MB的输出分开并发现它等于输入.

使用当前版本的Boost进行编译(1.55)

    #ifndef BufferedLogStream_h
    #define BufferedLogStream_h

    #include <stdio.h>
    #include <iostream>
    #include <iostream>
    #include <cstdlib>
    #include "boost\chrono\chrono.hpp"
    #include "boost\thread\thread.hpp"
    #include "boost\thread\locks.hpp"
    #include "boost\thread\mutex.hpp"
    #include "boost\thread\condition_variable.hpp"
    #include <time.h>

    using namespace std;

    #define BENCHMARK_STR_SIZE 128
    #define NUM_BENCHMARK_WRITES 524288
    #define TEST 0
    #define BENCHMARK 1
    #define WORKER_LOOP_WAIT_MICROSEC 20
    #define MAIN_LOOP_WAIT_MICROSEC 10

    #if(TEST)
    #define BUFFER_SIZE 10 
    #else 
    #define BUFFER_SIZE 33554432 //4 MB
    #endif

    class BufferedLogStream {
        public:
            BufferedLogStream();
            void openFile(char* filename);
            void flush();
            void close();
            inline void writeMessage(const char* message, unsigned int length);
            void writeMessage(string message);
            bool operator() () { return start != end; }

        private:
            void threadedWriter();
            inline bool hasSomethingToWrite();
            inline unsigned int getFreeSpaceInBuffer();
            void appendStringToBuffer(const char* message, unsigned int length);

            FILE* fp;
            char* start;
            char* end;
            char* endofringbuffer;
            char ringbuffer[BUFFER_SIZE];
            bool workerthreadkeepalive;
            boost::mutex mtx;
            boost::condition_variable waitforempty;
            boost::mutex workmtx;
            boost::condition_variable waitforwork;

            #if(TEST)
            struct testbuffer {
                int length;
                char message[BUFFER_SIZE * 2];
            };

            public:
                void test();

            private:
                void getNextRandomTest(testbuffer &tb);
                FILE* datatowrite;
            #endif

        #if(BENCHMARK)
            public:
                void runBenchmark();

            private:
                void initBenchmarkString();
                void runDirectWriteBaseline();
                void runBufferedWriteBenchmark();

                char benchmarkstr[BENCHMARK_STR_SIZE];
        #endif
    };

    #if(TEST)
    int main() {
        BufferedLogStream* bl = new BufferedLogStream();
        bl->openFile("replicated.txt");
        bl->test();
        bl->close();
        cout << "Done" << endl;
        cin.get();
        return 0;
    }
    #endif

    #if(BENCHMARK)
    int main() {
        BufferedLogStream* bl = new BufferedLogStream();
        bl->runBenchmark();
        cout << "Done" << endl;
        cin.get();
        return 0;
    }
    #endif //for benchmark

    #endif
Run Code Online (Sandbox Code Playgroud)

履行

    #include "BufferedLogStream.h"

    BufferedLogStream::BufferedLogStream() {
        fp = NULL;
        start = ringbuffer;
        end = ringbuffer;
        endofringbuffer = ringbuffer + BUFFER_SIZE;
        workerthreadkeepalive = true;
    }

    void BufferedLogStream::openFile(char* filename) {
        if(fp) close();
        workerthreadkeepalive = true;
        boost::thread t2(&BufferedLogStream::threadedWriter, this);
        fp = fopen(filename, "w+b");
    }

    void BufferedLogStream::flush() {
        fflush(fp); 
    }

    void BufferedLogStream::close() {
        workerthreadkeepalive = false;
        if(!fp) return;
        while(hasSomethingToWrite()) {
            boost::unique_lock<boost::mutex> u(mtx);
            waitforempty.wait_for(u, boost::chrono::microseconds(MAIN_LOOP_WAIT_MICROSEC));
        }
        flush();        
        fclose(fp);             
        fp = NULL;          
    }

    void BufferedLogStream::threadedWriter() {
        while(true) {
            if(start != end) {
                char* currentend = end;
                if(start < currentend) {
                    fwrite(start, 1, currentend - start, fp);
                }
                else if(start > currentend) {
                    if(start != endofringbuffer) fwrite(start, 1, endofringbuffer - start, fp); 
                    fwrite(ringbuffer, 1, currentend - ringbuffer, fp);
                }
                start = currentend;
                waitforempty.notify_one();
            }
            else { //start == end...no work to do
                if(!workerthreadkeepalive) return;
                boost::unique_lock<boost::mutex> u(workmtx);
                waitforwork.wait_for(u, boost::chrono::microseconds(WORKER_LOOP_WAIT_MICROSEC));
            }
        }
    }

    bool BufferedLogStream::hasSomethingToWrite() {
        return start != end;
    }

    void BufferedLogStream::writeMessage(string message) {
        writeMessage(message.c_str(), message.length());
    }

    unsigned int BufferedLogStream::getFreeSpaceInBuffer() {
        if(end > start) return (start - ringbuffer) + (endofringbuffer - end) - 1;
        if(end == start) return BUFFER_SIZE-1;
        return start - end - 1; //case where start > end
    }

    void BufferedLogStream::appendStringToBuffer(const char* message, unsigned int length) {
        if(end + length <= endofringbuffer) { //most common case for appropriately-sized buffer
            memcpy(end, message, length);
            end += length;
        }
        else {
            int lengthtoendofbuffer = endofringbuffer - end;
            if(lengthtoendofbuffer > 0) memcpy(end, message, lengthtoendofbuffer);
            int remainderlength =  length - lengthtoendofbuffer;
            memcpy(ringbuffer, message + lengthtoendofbuffer, remainderlength);
            end = ringbuffer + remainderlength;
        }
    }

    void BufferedLogStream::writeMessage(const char* message, unsigned int length) {
        if(length > BUFFER_SIZE - 1) { //if string is too large for buffer, wait for buffer to empty and bypass buffer, write directly to file
            while(hasSomethingToWrite()); {
                boost::unique_lock<boost::mutex> u(mtx);
                waitforempty.wait_for(u, boost::chrono::microseconds(MAIN_LOOP_WAIT_MICROSEC));
            }
            fwrite(message, 1, length, fp);
        }
        else {
            //wait until there is enough free space to insert new string
            while(getFreeSpaceInBuffer() < length) {
                boost::unique_lock<boost::mutex> u(mtx);
                waitforempty.wait_for(u, boost::chrono::microseconds(MAIN_LOOP_WAIT_MICROSEC));
            }
            appendStringToBuffer(message, length);
        }
        waitforwork.notify_one();
    }

    #if(TEST)
        void BufferedLogStream::getNextRandomTest(testbuffer &tb) {
            tb.length = 1 + (rand() % (int)(BUFFER_SIZE * 1.05));
            for(int i = 0; i < tb.length; i++) {
                tb.message[i] = rand() % 26 + 65;
            }
            tb.message[tb.length] = '\n';
            tb.length++;
            tb.message[tb.length] = '\0';
        }

        void BufferedLogStream::test() {
            cout << "Buffer size is: " << BUFFER_SIZE << endl;
            testbuffer tb;
            datatowrite = fopen("orig.txt", "w+b");
            for(unsigned int i = 0; i < 7000000; i++) {
                if(i % 1000000 == 0) cout << i << endl;
                getNextRandomTest(tb);
                writeMessage(tb.message, tb.length);
                fwrite(tb.message, 1, tb.length, datatowrite);
            }       
            fflush(datatowrite);
            fclose(datatowrite);
        }
    #endif

    #if(BENCHMARK) 
        void BufferedLogStream::initBenchmarkString() {
            for(unsigned int i = 0; i < BENCHMARK_STR_SIZE - 1; i++) {
                benchmarkstr[i] = rand() % 26 + 65;
            }
            benchmarkstr[BENCHMARK_STR_SIZE - 1] = '\n';
        }

        void BufferedLogStream::runDirectWriteBaseline() {
            clock_t starttime = clock();
            fp = fopen("BenchMarkBaseline.txt", "w+b");
            for(unsigned int i = 0; i < NUM_BENCHMARK_WRITES; i++) {
                fwrite(benchmarkstr, 1, BENCHMARK_STR_SIZE, fp);
            }   
            fflush(fp);
            fclose(fp);
            clock_t elapsedtime = clock() - starttime;
            cout << "Direct write baseline took " << ((double) elapsedtime) / CLOCKS_PER_SEC << " seconds." << endl;
        }

        void BufferedLogStream::runBufferedWriteBenchmark() {
            clock_t starttime = clock();
            openFile("BufferedBenchmark.txt");
            cout << "Opend file" << endl;
            for(unsigned int i = 0; i < NUM_BENCHMARK_WRITES; i++) {
                writeMessage(benchmarkstr, BENCHMARK_STR_SIZE);
            }   
            cout << "Wrote" << endl;
            close();
            cout << "Close" << endl;
            clock_t elapsedtime = clock() - starttime;
            cout << "Buffered write took " << ((double) elapsedtime) / CLOCKS_PER_SEC << " seconds." << endl;
        }

        void BufferedLogStream::runBenchmark() {
            cout << "Buffer size is: " << BUFFER_SIZE << endl;
            initBenchmarkString();
            runDirectWriteBaseline();
            runBufferedWriteBenchmark();
        }
    #endif
Run Code Online (Sandbox Code Playgroud)

更新:2013年11月25日

我使用boost :: condition_variables更新了下面的代码,特别是Evgeny Panasyuk推荐的wait_for()方法.这避免了不必要地一遍又一遍地检查相同的条件.我目前看到缓冲版本在大约1/6的时间内作为无缓冲/直接写入版本运行.这不是理想的情况,因为两种情况都受到硬盘的限制(在我的情况下是2010年的SSD).我打算在硬盘不是瓶颈的环境中使用下面的代码,大多数情况下(如果不是所有的话),缓冲区应该有可用空间来容纳writeMessage请求.这让我想到了下一个问题.我应该有多大的缓冲区?我不介意分配32 MB或64 MB以确保它永远不会填满.代码将在可以节省它的系统上运行.直觉上,我觉得静态分配32 MB字符数组是个坏主意.是吗?无论如何,我希望当我为我的预期应用程序运行下面的代码时,logData()调用的延迟将大大减少,这将显着减少整体处理时间.

如果有人看到任何方法使下面的代码更好(更快,更强大,更精简等),请告诉我.我很感激反馈.Lazin,您的方法将如何比我在下面发布的更快或更有效?我有点像只有一个缓冲区并使其足够大以至于它几乎不会填满的想法.然后我不必担心从不同的缓冲区读取.Evgeny Panasyuk,我喜欢尽可能使用现有代码的方法,特别是如果它是一个现有的boost库.但是,我也没有看到spcs_queue如何比我下面的更有效.我宁愿处理一个大缓冲区而不是许多小缓冲区,并且不得不担心拆分在输入上拆分输入流并将其拼接在输出上.您的方法将允许我将格式从主线程卸载到工作线程上.这是一种切割方法.但我不确定它是否会节省大量时间并实现全部好处,我将不得不修改我不拥有的代码.

//结束更新

Evg*_*zin 6

一般解决方案

我想你必须看看Naggle算法.对于一个生产者和一个消费者,这看起来像这样:

  • 在开始缓冲区为空时,工作线程处于空闲状态并等待事件.
  • 生产者将数据写入缓冲区并通知工作线程.
  • 工作线程醒来并开始写操作.
  • 生产者尝试编写另一条消息,但缓冲区由worker使用,因此生产者分配另一个缓冲区并向其写入消息.
  • 生产者尝试编写另一条消息,I/O仍在进行中,因此生产者将消息写入先前分配的缓冲区.
  • 工作线程完成了向文件写入缓冲区,并看到有另一个数据缓冲区,因此它抓取它并开始写入.
  • 生产者使用第一个缓冲区来写入所有连续的消息,直到第二个写入操作正在进行中.

此架构将有助于实现低延迟要求,单个消息将立即写入光盘,但大批量的事件将被写入以获得更高的吞吐量.

如果您的日志消息具有级别 - 您可以稍微改进此模式.所有错误消息都具有高优先级(级别),必须立即保存在光盘上(因为它们很少但非常有价值)但调试和跟踪消息的优先级较低,可以缓冲以节省带宽(因为它们非常频繁但不是很有价值作为错误和信息消息).所以,当你写error邮件,则必须等到工作线程完成编写信息(这是在同一个缓冲区中的所有消息),然后继续,但调试和跟踪消息可以仅仅写入缓冲.

线程.

每个应用程序线程产生工作线程的成本很高.您必须为每个日志文件使用单个写入程序线程.必须在线程之间共享写缓冲区.每个缓冲区必须有两个指针 - commit_pointerprepare_pointer.缓冲区开头之间的所有缓冲区空间commit_pointer都可用于工作线程.之间的缓冲空间commit_pointer以及prepare_pointer当前由应用程序线程更新.不变量:commit_pointer<= prepare_pointer.

写操作可以分两步进行.

  1. 准备写.此操作在缓冲区中保留空间.
    • 生产者计算len(消息)和原子更新prepare_pointer;
    • prepare_pointer值和len由消费者保存;
  2. 提交写.
    • 生产者在保留缓冲区空间的开头写入消息(旧的prepare_pointer值).
    • 生产者忙等待直到commit_pointer等于prepare_pointer它保存在局部变量中的旧值.
    • 生产者通过执行commit_pointer= commit_pointer+ len atomically 提交写操作.

为了防止错误共享,可以将len(message)四舍五入到缓存行大小,并且可以用空格填充所有额外空间.

// pseudocode
void write(const char* message) {
    int len = strlen(message);  // TODO: round to cache line size
    const char* old_prepare_ptr;
    // Prepare step
    while(1) 
    {
        old_prepare_ptr = prepare_ptr;
        if (
            CAS(&prepare_ptr, 
                 old_prepare_ptr, 
                 prepare_ptr + len) == old_prepare_ptr
            )
            break;
        // retry if another thread perform prepare op.
    }
    // Write message
    memcpy((void*)old_prepare_ptr, (void*)message, len);
    // Commit step
    while(1)
    {
        const char* old_commit_ptr = commit_ptr;
        if (
             CAS(&commit_ptr, 
                  old_commit_ptr, 
                  old_commit_ptr + len) == old_commit_ptr
            )
            break;
        // retry if another thread commits
    }
    notify_worker_thread();
}
Run Code Online (Sandbox Code Playgroud)