Ton*_*roy 12 c++ file-io boost asynchronous boost-asio
我正在使用VS2010的boost 1.50,使用Windows文件HANDLE读取(与使用套接字的asio相比,这似乎相对不常见).
该
handle_read回调获取到线8并返回与所有所附线1的第一位; 进一步的回调再次从第2行循环,令人作呕:
handle_read具有第1行到第7行的正确内容的预期回调length参数长于预期length,但是从asio流缓冲区中getline提取相应更长的行handle_read回调回收第2行到第7行,然后发生"长混合"线问题LINE 1 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789
LINE 2 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789
...3--E similarly...
LINE F abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789
Run Code Online (Sandbox Code Playgroud)
这是前15行输出(它永远持续):
line #1, length 70, getline() [69] 'LINE 1 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
line #2, length 70, getline() [69] 'LINE 2 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
...line #3 through #6 are fine too...
line #7, length 70, getline() [69] 'LINE 7 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
line #8, length 92, getline() [91] 'LINE 8 abcdefghijklmnoLINE 1 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
line #9, length 70, getline() [69] 'LINE 2 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
...line #10 through #13 are fine...
line #14, length 70, getline() [69] 'LINE 7 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
line #15, length 92, getline() [91] 'LINE 8 abcdefghijklmnoLINE 1 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
...
Run Code Online (Sandbox Code Playgroud)
请注意输出线#8和#15是输入LINE 8和LINE 1的混合.
#include "stdafx.h"
#include <cassert>
#include <iostream>
#include <string>
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <Windows.h>
#include <WinBase.h>
class AsyncReader
{
public:
AsyncReader(boost::asio::io_service& io_service, HANDLE handle)
: io_service_(io_service),
input_buffer(/*size*/ 8192),
input_handle(io_service, handle)
{
start_read();
}
void start_read()
{
boost::asio::async_read_until(input_handle, input_buffer, '\n',
boost::bind(&AsyncReader::handle_read, this,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
}
void handle_read(const boost::system::error_code& error, std::size_t length);
// void handle_write(const boost::system::error_code& error);
private:
boost::asio::io_service& io_service_;
boost::asio::streambuf input_buffer;
boost::asio::windows::stream_handle input_handle;
};
void AsyncReader::handle_read(const boost::system::error_code& error, std::size_t length)
{
if (!error)
{
static int count = 0;
++count;
// method 1: (same problem)
// const char* pStart = boost::asio::buffer_cast<const char*>(input_buffer.data());
// std::string s(pStart, length);
// input_buffer.consume(length);
// method 2:
std::istream is(&input_buffer);
std::string s;
assert(std::getline(is, s));
std::cout << "line #" << count << ", length " << length << ", getline() [" << s.size() << "] '" << s << "'\n";
start_read();
}
else if (error == boost::asio::error::not_found)
std::cerr << "Did not receive ending character!\n";
else
std::cerr << "Misc error during read!\n";
}
int _tmain(int argc, _TCHAR* argv[])
{
boost::asio::io_service io_service;
HANDLE handle = ::CreateFile(TEXT("c:/temp/input.txt"),
GENERIC_READ,
0, // share mode
NULL, // security attribute: NULL = default
OPEN_EXISTING, // creation disposition
FILE_FLAG_OVERLAPPED,
NULL // template file
);
AsyncReader obj(io_service, handle);
io_service.run();
std::cout << "Normal termination\n";
getchar();
return 0;
}
Run Code Online (Sandbox Code Playgroud)
CreateFile选项中的一些东西- 在我切换到之前它根本不起作用FILE_FLAG_OVERLAPPED- 不确定是否有其他要求甚至不表现为错误......?input_buffer.commit甚至.consume- 不确定是否有类似我应该做的事情,即使我能找到的所有示例代码(对于套接字)建议getline照顾...A stream_handle将始终在偏移零处读取.我认为它适用于套接字句柄,对常规文件无用.
如果streambuf尚未包含换行符,则调用async_read_until()将获得512个字节.第一个调用读取超过7行.当提取七行时,剩余字符("LINE 8 abcdefghijklmno")没有换行符,并且(相同)附加512字节.
为了解决这个问题,我建议使用一个random_access_handle.您必须手动跟踪文件位置并替换async_read_until为async_read_at.
class AsyncReader
{
...
void start_read()
{
async_read_at(input_handle, input_offset, input_buffer, ...);
}
private:
boost::asio::windows::random_access_handle input_handle;
boost::uint64_t input_offset;
};
void AsyncReader::handle_read(const boost::system::error_code& error,
std::size_t length)
{
input_offset += length;
if (!error || error == boost::asio::error::eof)
{
...
Run Code Online (Sandbox Code Playgroud)
此邮件列表帖子描述了同样的问题.而CreateFile与FILE_FLAG_OVERLAPPED允许异步I/O,它不建立其作为Boost.Asio的上下文的流.对于流,Boost.Asio的实现read_some作为read_some_at与胶印始终是0.这是问题的根源,正如ReadFile()文档所述:
对于支持字节偏移的文件,必须指定从文件开始读取的字节偏移量.
Boost.Asio是非常通用的,通常需要参数来满足某种类型的要求而不是特定的类型.因此,通常可以调整I/O对象或其服务以获得所需的行为.首先,必须确定适应的接口需要支持的内容.在这种情况下,async_read_until接受满足类型要求的任何类型AsyncReadStream. AsyncReadStream的要求是相当基本的,需要一个void async_read_some(MutableBufferSequence, ReadHandler)成员函数.
由于需要在整个组合async_read_until操作中跟踪偏移值,因此可以引入满足ReadHandler要求的简单类型,它将包装应用程序的ReadHandler,并相应地更新偏移量.
namespace detail {
/// @brief Handler to wrap asynchronous read_some_at operations.
template <typename Handler>
class read_some_offset_handler
{
public:
read_some_offset_handler(Handler handler, boost::uint64_t& offset)
: handler_(handler),
offset_(offset)
{}
void operator()(
const boost::system::error_code& error,
std::size_t bytes_transferred)
{
offset_ += bytes_transferred;
// If bytes were transferred, then set the error code as success.
// EOF will be detected on next read. This is to account for
// the read_until algorithm behavior.
const boost::system::error_code result_ec =
(error && bytes_transferred)
? make_error_code(boost::system::errc::success) : error;
handler_(result_ec, bytes_transferred);
}
//private:
Handler handler_;
boost::uint64_t& offset_;
};
/// @brief Hook that allows the wrapped handler to be invoked
/// within specific context. This is critical to support
/// composed operations being invoked within a strand.
template <typename Function,
typename Handler>
void asio_handler_invoke(
Function function,
detail::read_some_offset_handler<Handler>* handler)
{
boost_asio_handler_invoke_helpers::invoke(
function, handler->handler_);
}
} // namespace detail
Run Code Online (Sandbox Code Playgroud)
该asio_handler_invoke钩将通过ADL要找到支持在适当的范围内调用用户处理程序.当在a中调用组合操作时,这对于踏板安全是至关重要的strand.有关组合操作和链的更多详细信息,请参阅此答案.
以下课程将适应boost::asio::windows::random_access_handle类型要求AsyncReadStream.
/// @brief Adapts AsyncRandomAccessReadDevice to support AsyncReadStream.
template <typename AsyncRandomAccessReadDevice>
class basic_adapted_stream
: public AsyncRandomAccessReadDevice
{
public:
basic_adapted_stream(
boost::asio::io_service& io_service,
HANDLE handle
)
: AsyncRandomAccessReadDevice(io_service, handle),
offset_(0)
{}
template<typename MutableBufferSequence,
typename ReadHandler>
void async_read_some(
const MutableBufferSequence& buffers,
ReadHandler handler)
{
async_read_at(*this, offset_, buffers,
detail::read_some_offset_handler<ReadHandler>(handler, offset_));
}
private:
boost::uint64_t offset_;
};
Run Code Online (Sandbox Code Playgroud)
或者,boost::asio::windows::basic_stream_handle可以提供满足StreamHandleService类型要求的自定义类型,并实现async_read_some方面async_read_some_at.
/// @brief Service that implements async_read_some with async_read_some_at.
class offset_stream_handle_service
: public boost::asio::windows::stream_handle_service
{
private:
// The type of the platform-specific implementation.
typedef boost::asio::detail::win_iocp_handle_service service_impl_type;
public:
/// The unique service identifier.
static boost::asio::io_service::id id;
/// Construct a new stream handle service for the specified io_service.
explicit offset_stream_handle_service(boost::asio::io_service& io_service)
: boost::asio::windows::stream_handle_service(io_service),
service_impl_(io_service),
offset_(0)
{}
/// Start an asynchronous read.
template <typename MutableBufferSequence,
typename ReadHandler>
void
async_read_some(
implementation_type& impl,
const MutableBufferSequence& buffers,
ReadHandler handler)
{
// Implement async_read_some in terms of async_read_some_at. The provided
// ReadHandler will be hoisted in an internal handler so that offset_ can
// be properly updated.
service_impl_.async_read_some_at(impl, offset_, buffers,
detail::read_some_offset_handler<ReadHandler>(handler, offset_));
}
private:
// The platform-specific implementation.
service_impl_type service_impl_;
boost::uint64_t offset_;
};
boost::asio::io_service::id offset_stream_handle_service::id;
Run Code Online (Sandbox Code Playgroud)
我在示例代码中选择了简单性,但多个I/O对象将使用相同的服务.因此,offset_stream_handle_service当多个I/O对象使用该服务时,需要管理每个处理程序的偏移量才能正常运行.
要使用适应的类型,请将AsyncReader::input_handle成员变量修改为basic_adapted_stream<boost::asio::windows::random_access_handle>(适应的I/O对象)或boost::asio::windows::basic_stream_handle<offset_stream_handle_service>(适应的服务).
这是基于原始代码的完整示例,仅修改了AsyncReader::input_handler类型:
#include "stdafx.h"
#include <cassert>
#include <iostream>
#include <string>
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <Windows.h>
#include <WinBase.h>
namespace detail {
/// @brief Handler to wrap asynchronous read_some_at operations.
template <typename Handler>
class read_some_offset_handler
{
public:
read_some_offset_handler(Handler handler, boost::uint64_t& offset)
: handler_(handler),
offset_(offset)
{}
void operator()(
const boost::system::error_code& error,
std::size_t bytes_transferred)
{
offset_ += bytes_transferred;
// If bytes were transferred, then set the error code as success.
// EOF will be detected on next read. This is to account for
// the read_until algorithm behavior.
const boost::system::error_code result_ec =
(error && bytes_transferred)
? make_error_code(boost::system::errc::success) : error;
handler_(result_ec, bytes_transferred);
}
//private:
Handler handler_;
boost::uint64_t& offset_;
};
/// @brief Hook that allows the wrapped handler to be invoked
/// within specific context. This is critical to support
/// composed operations being invoked within a strand.
template <typename Function,
typename Handler>
void asio_handler_invoke(
Function function,
detail::read_some_offset_handler<Handler>* handler)
{
boost_asio_handler_invoke_helpers::invoke(
function, handler->handler_);
}
} // namespace detail
/// @brief Adapts AsyncRandomAccessReadDevice to support AsyncReadStream.
template <typename AsyncRandomAccessReadDevice>
class basic_adapted_stream
: public AsyncRandomAccessReadDevice
{
public:
basic_adapted_stream(
boost::asio::io_service& io_service,
HANDLE handle
)
: AsyncRandomAccessReadDevice(io_service, handle),
offset_(0)
{}
template<typename MutableBufferSequence,
typename ReadHandler>
void async_read_some(
const MutableBufferSequence& buffers,
ReadHandler handler)
{
async_read_at(*this, offset_, buffers,
detail::read_some_offset_handler<ReadHandler>(handler, offset_));
}
private:
boost::uint64_t offset_;
};
/// @brief Service that implements async_read_some with async_read_some_at.
class offset_stream_handle_service
: public boost::asio::windows::stream_handle_service
{
private:
// The type of the platform-specific implementation.
typedef boost::asio::detail::win_iocp_handle_service service_impl_type;
public:
/// The unique service identifier.
static boost::asio::io_service::id id;
/// Construct a new stream handle service for the specified io_service.
explicit offset_stream_handle_service(boost::asio::io_service& io_service)
: boost::asio::windows::stream_handle_service(io_service),
service_impl_(io_service),
offset_(0)
{}
/// Start an asynchronous read.
template <typename MutableBufferSequence,
typename ReadHandler>
void
async_read_some(
implementation_type& impl,
const MutableBufferSequence& buffers,
ReadHandler handler)
{
// Implement async_read_some in terms of async_read_some_at. The provided
// ReadHandler will be hoisted in an internal handler so that offset_ can
// be properly updated.
service_impl_.async_read_some_at(impl, offset_, buffers,
detail::read_some_offset_handler<ReadHandler>(handler, offset_));
}
private:
// The platform-specific implementation.
service_impl_type service_impl_;
boost::uint64_t offset_;
};
boost::asio::io_service::id offset_stream_handle_service::id;
#ifndef ADAPT_IO_SERVICE
typedef basic_adapted_stream<
boost::asio::windows::random_access_handle> adapted_stream;
#else
typedef boost::asio::windows::basic_stream_handle<
offset_stream_handle_service> adapted_stream;
#endif
class AsyncReader
{
public:
AsyncReader(boost::asio::io_service& io_service, HANDLE handle)
: io_service_(io_service),
input_buffer(/*size*/ 8192),
input_handle(io_service, handle)
{
start_read();
}
void start_read()
{
boost::asio::async_read_until(input_handle, input_buffer, '\n',
boost::bind(&AsyncReader::handle_read, this,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
}
void handle_read(const boost::system::error_code& error, std::size_t length);
// void handle_write(const boost::system::error_code& error);
private:
boost::asio::io_service& io_service_;
boost::asio::streambuf input_buffer;
adapted_stream input_handle;
};
void AsyncReader::handle_read(const boost::system::error_code& error, std::size_t length)
{
if (!error)
{
static int count = 0;
++count;
// method 1: (same problem)
// const char* pStart = boost::asio::buffer_cast<const char*>(input_buffer.data());
// std::string s(pStart, length);
// input_buffer.consume(length);
// method 2:
std::istream is(&input_buffer);
std::string s;
assert(std::getline(is, s));
std::cout << "line #" << count << ", length " << length << ", getline() [" << s.size() << "] '" << s << "'\n";
start_read();
}
else if (error == boost::asio::error::not_found)
std::cerr << "Did not receive ending character!\n";
else
std::cerr << "Misc error during read!\n";
}
int _tmain(int argc, _TCHAR* argv[])
{
boost::asio::io_service io_service;
HANDLE handle = ::CreateFile(TEXT("c:/temp/input.txt"),
GENERIC_READ,
0, // share mode
NULL, // security attribute: NULL = default
OPEN_EXISTING, // creation disposition
FILE_FLAG_OVERLAPPED,
NULL // template file
);
AsyncReader obj(io_service, handle);
io_service.run();
std::cout << "Normal termination\n";
getchar();
return 0;
}
Run Code Online (Sandbox Code Playgroud)
使用原始问题的输入时,会产生以下输出:
line #1, length 70, getline() [69] 'LINE 1 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789' line #2, length 70, getline() [69] 'LINE 2 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789' line #3, length 70, getline() [69] 'LINE 3 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789' line #4, length 70, getline() [69] 'LINE 4 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789' line #5, length 70, getline() [69] 'LINE 5 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789' line #6, length 70, getline() [69] 'LINE 6 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789' line #7, length 70, getline() [69] 'LINE 7 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789' line #8, length 70, getline() [69] 'LINE 8 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789' line #9, length 70, getline() [69] 'LINE 9 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789' line #10, length 70, getline() [69] 'LINE 0 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789' line #11, length 70, getline() [69] 'LINE A abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789' line #12, length 70, getline() [69] 'LINE B abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789' line #13, length 70, getline() [69] 'LINE C abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789' line #14, length 70, getline() [69] 'LINE D abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789' line #15, length 70, getline() [69] 'LINE E abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789' Misc error during read! Normal termination
我的输入文件\n在LINE F的末尾没有字符.因此,AsyncReader::handle_read()调用时出现错误,boost::asio::error::eof并且其input_buffer内容包含LINE F.修改最后的else情况后打印更多信息:
...
else
{
std::cerr << "Error: " << error.message() << "\n";
if (std::size_t buffer_size = input_buffer.size())
{
boost::asio::streambuf::const_buffers_type bufs = input_buffer.data();
std::string contents(boost::asio::buffers_begin(bufs),
boost::asio::buffers_begin(bufs) + buffer_size);
std::cerr << "stream contents: '" << contents << "'\n";
}
}
Run Code Online (Sandbox Code Playgroud)
我得到以下输出:
line #1, length 70, getline() [69] 'LINE 1 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789' line #2, length 70, getline() [69] 'LINE 2 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789' line #3, length 70, getline() [69] 'LINE 3 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789' line #4, length 70, getline() [69] 'LINE 4 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789' line #5, length 70, getline() [69] 'LINE 5 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789' line #6, length 70, getline() [69] 'LINE 6 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789' line #7, length 70, getline() [69] 'LINE 7 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789' line #8, length 70, getline() [69] 'LINE 8 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789' line #9, length 70, getline() [69] 'LINE 9 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789' line #10, length 70, getline() [69] 'LINE 0 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789' line #11, length 70, getline() [69] 'LINE A abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789' line #12, length 70, getline() [69] 'LINE B abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789' line #13, length 70, getline() [69] 'LINE C abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789' line #14, length 70, getline() [69] 'LINE D abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789' line #15, length 70, getline() [69] 'LINE E abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789' Error: End of file stream contents: 'LINE F abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789' Normal termination
| 归档时间: |
|
| 查看次数: |
3239 次 |
| 最近记录: |