And*_*ott 10 c++ file-io asynchronous c++11
我有一些长循环,我需要在每次迭代时将一些数据写入文件.问题是写入文件可能很慢,所以我希望通过异步写入来减少这个时间.
有谁知道这样做的好方法?我应该创建一个线程,通过写出来消耗放入缓冲区的任何东西(在这种情况下,单个生产者,单个消费者)?
我主要感兴趣的是除了标准库(C++ 11)之外什么都不涉及的解决方案.
Die*_*ühl 16
在进入异步写入之前,如果您正在使用IOStream,您可能希望尝试避免意外刷新流,例如,不使用std::endl而是使用'\n'.由于写入IOStreams是缓冲的,因此可以提高性能.
如果这还不够,那么下一个问题就是如何编写数据.如果正在进行大量格式化,则实际格式化有可能占用大部分时间.您可能能够将格式化推送到单独的线程中,但这与仅仅将几个字节写入另一个线程完全不同:您需要传递一个合适的数据结构来保存要格式化的数据.但是,什么是合适的取决于你实际写的是什么.
最后,如果将缓冲区写入文件确实是瓶颈并且您希望坚持使用标准C++库,那么让编写器线程监听充满来自合适流缓冲区的缓冲区的队列并写入缓冲区可能是合理的.到std::ofstream:生成器接口将是一个std::ostream可能在缓冲区已满或者刷新流(我已std::flush明确使用它)到另一个读取侦听的队列时发送可能固定大小的缓冲区的接口.以下是仅使用标准库工具的快速实现:
#include <condition_variable>
#include <fstream>
#include <mutex>
#include <queue>
#include <streambuf>
#include <string>
#include <thread>
#include <vector>
struct async_buf
: std::streambuf
{
std::ofstream out;
std::mutex mutex;
std::condition_variable condition;
std::queue<std::vector<char>> queue;
std::vector<char> buffer;
bool done;
std::thread thread;
void worker() {
bool local_done(false);
std::vector<char> buf;
while (!local_done) {
{
std::unique_lock<std::mutex> guard(this->mutex);
this->condition.wait(guard,
[this](){ return !this->queue.empty()
|| this->done; });
if (!this->queue.empty()) {
buf.swap(queue.front());
queue.pop();
}
local_done = this->queue.empty() && this->done;
}
if (!buf.empty()) {
out.write(buf.data(), std::streamsize(buf.size()));
buf.clear();
}
}
out.flush();
}
public:
async_buf(std::string const& name)
: out(name)
, buffer(128)
, done(false)
, thread(&async_buf::worker, this) {
this->setp(this->buffer.data(),
this->buffer.data() + this->buffer.size() - 1);
}
~async_buf() {
std::unique_lock<std::mutex>(this->mutex), (this->done = true);
this->condition.notify_one();
this->thread.join();
}
int overflow(int c) {
if (c != std::char_traits<char>::eof()) {
*this->pptr() = std::char_traits<char>::to_char_type(c);
this->pbump(1);
}
return this->sync() != -1
? std::char_traits<char>::not_eof(c): std::char_traits<char>::eof();
}
int sync() {
if (this->pbase() != this->pptr()) {
this->buffer.resize(std::size_t(this->pptr() - this->pbase()));
{
std::unique_lock<std::mutex> guard(this->mutex);
this->queue.push(std::move(this->buffer));
}
this->condition.notify_one();
this->buffer = std::vector<char>(128);
this->setp(this->buffer.data(),
this->buffer.data() + this->buffer.size() - 1);
}
return 0;
}
};
int main()
{
async_buf sbuf("async.out");
std::ostream astream(&sbuf);
std::ifstream in("async_stream.cpp");
for (std::string line; std::getline(in, line); ) {
astream << line << '\n' << std::flush;
}
}
Run Code Online (Sandbox Code Playgroud)