486*_*-66 5 c++ multithreading boost asynchronous low-latency
问题:
3下面的循环包含已注释掉的代码.我搜索("TAG1","TAG2"和"TAG3")以便于识别.我只是希望while循环在测试的条件下等待,然后继续进行,同时尽可能地减少CPU资源.我首先尝试使用Boost条件变量,但是存在竞争条件.让线程休眠'x'微秒是低效的,因为没有办法精确计时唤醒.最后,boost :: this_thread :: yield()似乎没有做任何事情.可能是因为我在双核系统上只有2个活动线程.具体来说,如何在引入尽可能少的不必要阻塞的同时,使下面的三个标记区域更有效地运行.
背景
目的:
我有一个记录大量数据的应用程序.在分析之后,我发现在日志记录操作上花费了很多时间(将文本或二进制文件记录到本地硬盘上的文件中).我的目标是通过使用线程缓冲流记录器的调用替换非线程直接写入调用来减少logData调用的延迟.
选项探索:
设计:
结果:
使用当前版本的Boost进行编译(1.55)
头
#ifndef BufferedLogStream_h
#define BufferedLogStream_h
#include <stdio.h>
#include <iostream>
#include <iostream>
#include <cstdlib>
#include "boost\chrono\chrono.hpp"
#include "boost\thread\thread.hpp"
#include "boost\thread\locks.hpp"
#include "boost\thread\mutex.hpp"
#include "boost\thread\condition_variable.hpp"
#include <time.h>
using namespace std;
#define BENCHMARK_STR_SIZE 128
#define NUM_BENCHMARK_WRITES 524288
#define TEST 0
#define BENCHMARK 1
#define WORKER_LOOP_WAIT_MICROSEC 20
#define MAIN_LOOP_WAIT_MICROSEC 10
#if(TEST)
#define BUFFER_SIZE 10
#else
#define BUFFER_SIZE 33554432 //4 MB
#endif
class BufferedLogStream {
public:
BufferedLogStream();
void openFile(char* filename);
void flush();
void close();
inline void writeMessage(const char* message, unsigned int length);
void writeMessage(string message);
bool operator() () { return start != end; }
private:
void threadedWriter();
inline bool hasSomethingToWrite();
inline unsigned int getFreeSpaceInBuffer();
void appendStringToBuffer(const char* message, unsigned int length);
FILE* fp;
char* start;
char* end;
char* endofringbuffer;
char ringbuffer[BUFFER_SIZE];
bool workerthreadkeepalive;
boost::mutex mtx;
boost::condition_variable waitforempty;
boost::mutex workmtx;
boost::condition_variable waitforwork;
#if(TEST)
struct testbuffer {
int length;
char message[BUFFER_SIZE * 2];
};
public:
void test();
private:
void getNextRandomTest(testbuffer &tb);
FILE* datatowrite;
#endif
#if(BENCHMARK)
public:
void runBenchmark();
private:
void initBenchmarkString();
void runDirectWriteBaseline();
void runBufferedWriteBenchmark();
char benchmarkstr[BENCHMARK_STR_SIZE];
#endif
};
#if(TEST)
int main() {
BufferedLogStream* bl = new BufferedLogStream();
bl->openFile("replicated.txt");
bl->test();
bl->close();
cout << "Done" << endl;
cin.get();
return 0;
}
#endif
#if(BENCHMARK)
int main() {
BufferedLogStream* bl = new BufferedLogStream();
bl->runBenchmark();
cout << "Done" << endl;
cin.get();
return 0;
}
#endif //for benchmark
#endif
Run Code Online (Sandbox Code Playgroud)
履行
#include "BufferedLogStream.h"
BufferedLogStream::BufferedLogStream() {
fp = NULL;
start = ringbuffer;
end = ringbuffer;
endofringbuffer = ringbuffer + BUFFER_SIZE;
workerthreadkeepalive = true;
}
void BufferedLogStream::openFile(char* filename) {
if(fp) close();
workerthreadkeepalive = true;
boost::thread t2(&BufferedLogStream::threadedWriter, this);
fp = fopen(filename, "w+b");
}
void BufferedLogStream::flush() {
fflush(fp);
}
void BufferedLogStream::close() {
workerthreadkeepalive = false;
if(!fp) return;
while(hasSomethingToWrite()) {
boost::unique_lock<boost::mutex> u(mtx);
waitforempty.wait_for(u, boost::chrono::microseconds(MAIN_LOOP_WAIT_MICROSEC));
}
flush();
fclose(fp);
fp = NULL;
}
void BufferedLogStream::threadedWriter() {
while(true) {
if(start != end) {
char* currentend = end;
if(start < currentend) {
fwrite(start, 1, currentend - start, fp);
}
else if(start > currentend) {
if(start != endofringbuffer) fwrite(start, 1, endofringbuffer - start, fp);
fwrite(ringbuffer, 1, currentend - ringbuffer, fp);
}
start = currentend;
waitforempty.notify_one();
}
else { //start == end...no work to do
if(!workerthreadkeepalive) return;
boost::unique_lock<boost::mutex> u(workmtx);
waitforwork.wait_for(u, boost::chrono::microseconds(WORKER_LOOP_WAIT_MICROSEC));
}
}
}
bool BufferedLogStream::hasSomethingToWrite() {
return start != end;
}
void BufferedLogStream::writeMessage(string message) {
writeMessage(message.c_str(), message.length());
}
unsigned int BufferedLogStream::getFreeSpaceInBuffer() {
if(end > start) return (start - ringbuffer) + (endofringbuffer - end) - 1;
if(end == start) return BUFFER_SIZE-1;
return start - end - 1; //case where start > end
}
void BufferedLogStream::appendStringToBuffer(const char* message, unsigned int length) {
if(end + length <= endofringbuffer) { //most common case for appropriately-sized buffer
memcpy(end, message, length);
end += length;
}
else {
int lengthtoendofbuffer = endofringbuffer - end;
if(lengthtoendofbuffer > 0) memcpy(end, message, lengthtoendofbuffer);
int remainderlength = length - lengthtoendofbuffer;
memcpy(ringbuffer, message + lengthtoendofbuffer, remainderlength);
end = ringbuffer + remainderlength;
}
}
void BufferedLogStream::writeMessage(const char* message, unsigned int length) {
if(length > BUFFER_SIZE - 1) { //if string is too large for buffer, wait for buffer to empty and bypass buffer, write directly to file
while(hasSomethingToWrite()); {
boost::unique_lock<boost::mutex> u(mtx);
waitforempty.wait_for(u, boost::chrono::microseconds(MAIN_LOOP_WAIT_MICROSEC));
}
fwrite(message, 1, length, fp);
}
else {
//wait until there is enough free space to insert new string
while(getFreeSpaceInBuffer() < length) {
boost::unique_lock<boost::mutex> u(mtx);
waitforempty.wait_for(u, boost::chrono::microseconds(MAIN_LOOP_WAIT_MICROSEC));
}
appendStringToBuffer(message, length);
}
waitforwork.notify_one();
}
#if(TEST)
void BufferedLogStream::getNextRandomTest(testbuffer &tb) {
tb.length = 1 + (rand() % (int)(BUFFER_SIZE * 1.05));
for(int i = 0; i < tb.length; i++) {
tb.message[i] = rand() % 26 + 65;
}
tb.message[tb.length] = '\n';
tb.length++;
tb.message[tb.length] = '\0';
}
void BufferedLogStream::test() {
cout << "Buffer size is: " << BUFFER_SIZE << endl;
testbuffer tb;
datatowrite = fopen("orig.txt", "w+b");
for(unsigned int i = 0; i < 7000000; i++) {
if(i % 1000000 == 0) cout << i << endl;
getNextRandomTest(tb);
writeMessage(tb.message, tb.length);
fwrite(tb.message, 1, tb.length, datatowrite);
}
fflush(datatowrite);
fclose(datatowrite);
}
#endif
#if(BENCHMARK)
void BufferedLogStream::initBenchmarkString() {
for(unsigned int i = 0; i < BENCHMARK_STR_SIZE - 1; i++) {
benchmarkstr[i] = rand() % 26 + 65;
}
benchmarkstr[BENCHMARK_STR_SIZE - 1] = '\n';
}
void BufferedLogStream::runDirectWriteBaseline() {
clock_t starttime = clock();
fp = fopen("BenchMarkBaseline.txt", "w+b");
for(unsigned int i = 0; i < NUM_BENCHMARK_WRITES; i++) {
fwrite(benchmarkstr, 1, BENCHMARK_STR_SIZE, fp);
}
fflush(fp);
fclose(fp);
clock_t elapsedtime = clock() - starttime;
cout << "Direct write baseline took " << ((double) elapsedtime) / CLOCKS_PER_SEC << " seconds." << endl;
}
void BufferedLogStream::runBufferedWriteBenchmark() {
clock_t starttime = clock();
openFile("BufferedBenchmark.txt");
cout << "Opend file" << endl;
for(unsigned int i = 0; i < NUM_BENCHMARK_WRITES; i++) {
writeMessage(benchmarkstr, BENCHMARK_STR_SIZE);
}
cout << "Wrote" << endl;
close();
cout << "Close" << endl;
clock_t elapsedtime = clock() - starttime;
cout << "Buffered write took " << ((double) elapsedtime) / CLOCKS_PER_SEC << " seconds." << endl;
}
void BufferedLogStream::runBenchmark() {
cout << "Buffer size is: " << BUFFER_SIZE << endl;
initBenchmarkString();
runDirectWriteBaseline();
runBufferedWriteBenchmark();
}
#endif
Run Code Online (Sandbox Code Playgroud)
更新:2013年11月25日
我使用boost :: condition_variables更新了下面的代码,特别是Evgeny Panasyuk推荐的wait_for()方法.这避免了不必要地一遍又一遍地检查相同的条件.我目前看到缓冲版本在大约1/6的时间内作为无缓冲/直接写入版本运行.这不是理想的情况,因为两种情况都受到硬盘的限制(在我的情况下是2010年的SSD).我打算在硬盘不是瓶颈的环境中使用下面的代码,大多数情况下(如果不是所有的话),缓冲区应该有可用空间来容纳writeMessage请求.这让我想到了下一个问题.我应该有多大的缓冲区?我不介意分配32 MB或64 MB以确保它永远不会填满.代码将在可以节省它的系统上运行.直觉上,我觉得静态分配32 MB字符数组是个坏主意.是吗?无论如何,我希望当我为我的预期应用程序运行下面的代码时,logData()调用的延迟将大大减少,这将显着减少整体处理时间.
如果有人看到任何方法使下面的代码更好(更快,更强大,更精简等),请告诉我.我很感激反馈.Lazin,您的方法将如何比我在下面发布的更快或更有效?我有点像只有一个缓冲区并使其足够大以至于它几乎不会填满的想法.然后我不必担心从不同的缓冲区读取.Evgeny Panasyuk,我喜欢尽可能使用现有代码的方法,特别是如果它是一个现有的boost库.但是,我也没有看到spcs_queue如何比我下面的更有效.我宁愿处理一个大缓冲区而不是许多小缓冲区,并且不得不担心拆分在输入上拆分输入流并将其拼接在输出上.您的方法将允许我将格式从主线程卸载到工作线程上.这是一种切割方法.但我不确定它是否会节省大量时间并实现全部好处,我将不得不修改我不拥有的代码.
//结束更新
一般解决方案
我想你必须看看Naggle算法.对于一个生产者和一个消费者,这看起来像这样:
此架构将有助于实现低延迟要求,单个消息将立即写入光盘,但大批量的事件将被写入以获得更高的吞吐量.
如果您的日志消息具有级别 - 您可以稍微改进此模式.所有错误消息都具有高优先级(级别),必须立即保存在光盘上(因为它们很少但非常有价值)但调试和跟踪消息的优先级较低,可以缓冲以节省带宽(因为它们非常频繁但不是很有价值作为错误和信息消息).所以,当你写error邮件,则必须等到工作线程完成编写信息(这是在同一个缓冲区中的所有消息),然后继续,但调试和跟踪消息可以仅仅写入缓冲.
线程.
每个应用程序线程产生工作线程的成本很高.您必须为每个日志文件使用单个写入程序线程.必须在线程之间共享写缓冲区.每个缓冲区必须有两个指针 - commit_pointer和prepare_pointer.缓冲区开头之间的所有缓冲区空间commit_pointer都可用于工作线程.之间的缓冲空间commit_pointer以及prepare_pointer当前由应用程序线程更新.不变量:commit_pointer<= prepare_pointer.
写操作可以分两步进行.
prepare_pointer;prepare_pointer值和len由消费者保存;commit_pointer等于prepare_pointer它保存在局部变量中的旧值.commit_pointer= commit_pointer+ len atomically 提交写操作.为了防止错误共享,可以将len(message)四舍五入到缓存行大小,并且可以用空格填充所有额外空间.
// pseudocode
void write(const char* message) {
int len = strlen(message); // TODO: round to cache line size
const char* old_prepare_ptr;
// Prepare step
while(1)
{
old_prepare_ptr = prepare_ptr;
if (
CAS(&prepare_ptr,
old_prepare_ptr,
prepare_ptr + len) == old_prepare_ptr
)
break;
// retry if another thread perform prepare op.
}
// Write message
memcpy((void*)old_prepare_ptr, (void*)message, len);
// Commit step
while(1)
{
const char* old_commit_ptr = commit_ptr;
if (
CAS(&commit_ptr,
old_commit_ptr,
old_commit_ptr + len) == old_commit_ptr
)
break;
// retry if another thread commits
}
notify_worker_thread();
}
Run Code Online (Sandbox Code Playgroud)