C++ boost asio Windows文件句柄async_read_until无限循环 - 没有eof

Ton*_*roy 12 c++ file-io boost asynchronous boost-asio

我正在使用VS2010的boost 1.50,使用Windows文件HANDLE读取(与使用套接字的asio相比,这似乎相对不常见).

问题

handle_read回调获取到线8并返回与所有所附线1的第一位; 进一步的回调再次从第2行循环,令人作呕:

  • 打开一个简短的文本文件(如下)
  • 获得handle_read具有第1行到第7行的正确内容的预期回调
  • 下一个回调的字节读取 length参数长于预期
  • 虽然没有使用length,但是从asio流缓冲区中getline提取相应更长的行
  • 提取内容切换到中间行以从输入文件重复第一行
  • 进一步的handle_read回调回收第2行到第7行,然后发生"长混合"线问题
  • ad nauseum

输入

LINE 1 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789
LINE 2 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789
...3--E similarly...
LINE F abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789
Run Code Online (Sandbox Code Playgroud)

产量

这是前15行输出(它永远持续):

line #1, length 70, getline() [69] 'LINE 1 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
line #2, length 70, getline() [69] 'LINE 2 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
...line #3 through #6 are fine too...
line #7, length 70, getline() [69] 'LINE 7 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
line #8, length 92, getline() [91] 'LINE 8 abcdefghijklmnoLINE 1 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
line #9, length 70, getline() [69] 'LINE 2 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
...line #10 through #13 are fine...
line #14, length 70, getline() [69] 'LINE 7 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
line #15, length 92, getline() [91] 'LINE 8 abcdefghijklmnoLINE 1 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
...
Run Code Online (Sandbox Code Playgroud)

请注意输出线#8和#15是输入LINE 8和LINE 1的混合.

代码

#include "stdafx.h"

#include <cassert>
#include <iostream>
#include <string>

#include <boost/asio.hpp>
#include <boost/bind.hpp>

#include <Windows.h>
#include <WinBase.h>

class AsyncReader
{
  public:
    AsyncReader(boost::asio::io_service& io_service, HANDLE handle)
      : io_service_(io_service),
        input_buffer(/*size*/ 8192),
        input_handle(io_service, handle)
    {
        start_read();
    }

    void start_read()
    {
        boost::asio::async_read_until(input_handle, input_buffer, '\n',
            boost::bind(&AsyncReader::handle_read, this,
                boost::asio::placeholders::error,
                boost::asio::placeholders::bytes_transferred));
    }

    void handle_read(const boost::system::error_code& error, std::size_t length);
    // void handle_write(const boost::system::error_code& error);

  private:
    boost::asio::io_service& io_service_;
    boost::asio::streambuf input_buffer;
    boost::asio::windows::stream_handle input_handle;
};

void AsyncReader::handle_read(const boost::system::error_code& error, std::size_t length)
{
    if (!error)
    {
        static int count = 0;
        ++count;

        // method 1: (same problem)
        // const char* pStart = boost::asio::buffer_cast<const char*>(input_buffer.data());
        // std::string s(pStart, length);
        // input_buffer.consume(length);

        // method 2:
        std::istream is(&input_buffer);
        std::string s;
        assert(std::getline(is, s));

        std::cout << "line #" << count << ", length " << length << ", getline() [" << s.size() << "] '" << s << "'\n";

        start_read();
    }
    else if (error == boost::asio::error::not_found)
        std::cerr << "Did not receive ending character!\n";
    else
        std::cerr << "Misc error during read!\n";
}
int _tmain(int argc, _TCHAR* argv[])
{
    boost::asio::io_service io_service;

    HANDLE handle = ::CreateFile(TEXT("c:/temp/input.txt"),
                                 GENERIC_READ,
                                 0, // share mode
                                 NULL, // security attribute: NULL = default
                                 OPEN_EXISTING, // creation disposition
                                 FILE_FLAG_OVERLAPPED,
                                 NULL // template file
                                );

    AsyncReader obj(io_service, handle);

    io_service.run();

    std::cout << "Normal termination\n";
    getchar();
    return 0;
}
Run Code Online (Sandbox Code Playgroud)

我的想法

  • 它可能是CreateFile选项中的一些东西- 在我切换到之前它根本不起作用FILE_FLAG_OVERLAPPED- 不确定是否有其他要求甚至不表现为错误......?
  • 我已经尝试过input_buffer.commit甚至.consume- 不确定是否有类似我应该做的事情,即使我能找到的所有示例代码(对于套接字)建议getline照顾...
  • 恼怒/我想念Linux ....

kun*_*sch 5

A stream_handle将始终在偏移零处读取.我认为它适用于套接字句柄,对常规文件无用.

如果streambuf尚未包含换行符,则调用async_read_until()将获得512个字节.第一个调用读取超过7行.当提取七行时,剩余字符("LINE 8 abcdefghijklmno")没有换行符,并且(相同)附加512字节.

为了解决这个问题,我建议使用一个random_access_handle.您必须手动跟踪文件位置并替换async_read_untilasync_read_at.

class AsyncReader
{
  ...
  void start_read()
  {
    async_read_at(input_handle, input_offset, input_buffer, ...);
  }
private:
  boost::asio::windows::random_access_handle input_handle;
  boost::uint64_t input_offset;
};

void AsyncReader::handle_read(const boost::system::error_code& error,
                              std::size_t length)
{
  input_offset += length;
  if (!error || error == boost::asio::error::eof)
  {
    ...
Run Code Online (Sandbox Code Playgroud)


Tan*_*ury 5

邮件列表帖子描述了同样的问题.而CreateFileFILE_FLAG_OVERLAPPED允许异步I/O,它不建立其作为Boost.Asio的上下文的流.对于流,Boost.Asio的实现read_some作为read_some_at与胶印始终是0.这是问题的根源,正如ReadFile()文档所述:

对于支持字节偏移的文件,必须指定从文件开始读取的字节偏移量.


适应类型要求

Boost.Asio是非常通用的,通常需要参数来满足某种类型的要求而不是特定的类型.因此,通常可以调整I/O对象或其服务以获得所需的行为.首先,必须确定适应的接口需要支持的内容.在这种情况下,async_read_until接受满足类型要求的任何类型AsyncReadStream. AsyncReadStream的要求是相当基本的,需要一个void async_read_some(MutableBufferSequence, ReadHandler)成员函数.

由于需要在整个组合async_read_until操作中跟踪偏移值,因此可以引入满足ReadHandler要求的简单类型,它将包装应用程序的ReadHandler,并相应地更新偏移量.

namespace detail {
/// @brief Handler to wrap asynchronous read_some_at operations.
template <typename Handler>
class read_some_offset_handler
{
public:
  read_some_offset_handler(Handler handler, boost::uint64_t& offset)
    : handler_(handler),
      offset_(offset)
  {}

  void operator()(
    const boost::system::error_code& error,
    std::size_t bytes_transferred)
  {
    offset_ += bytes_transferred;

    // If bytes were transferred, then set the error code as success.
    // EOF will be detected on next read.  This is to account for
    // the read_until algorithm behavior.
    const boost::system::error_code result_ec =
      (error && bytes_transferred)
      ? make_error_code(boost::system::errc::success) : error;

    handler_(result_ec, bytes_transferred);
  }

//private:
  Handler handler_;
  boost::uint64_t& offset_;
};

/// @brief Hook that allows the wrapped handler to be invoked
///        within specific context.  This is critical to support
///        composed operations being invoked within a strand.
template <typename Function,
          typename Handler>
void asio_handler_invoke(
  Function function,
  detail::read_some_offset_handler<Handler>* handler)
{
  boost_asio_handler_invoke_helpers::invoke(
    function, handler->handler_);
}

} // namespace detail
Run Code Online (Sandbox Code Playgroud)

asio_handler_invoke钩将通过ADL要找到支持在适当的范围内调用用户处理程序.当在a中调用组合操作时,这对于踏板安全是至关重要的strand.有关组合操作和链的更多详细信息,请参阅答案.

以下课程将适应boost::asio::windows::random_access_handle类型要求AsyncReadStream.

/// @brief Adapts AsyncRandomAccessReadDevice to support AsyncReadStream.
template <typename AsyncRandomAccessReadDevice>
class basic_adapted_stream
  : public AsyncRandomAccessReadDevice
{
public:
  basic_adapted_stream(
    boost::asio::io_service& io_service,
    HANDLE handle
  )
    : AsyncRandomAccessReadDevice(io_service, handle),
      offset_(0)
  {}

  template<typename MutableBufferSequence,
           typename ReadHandler>
  void async_read_some(
    const MutableBufferSequence& buffers,
    ReadHandler handler)
  {
    async_read_at(*this, offset_, buffers, 
      detail::read_some_offset_handler<ReadHandler>(handler, offset_));
  }

private:
  boost::uint64_t offset_;
};
Run Code Online (Sandbox Code Playgroud)

或者,boost::asio::windows::basic_stream_handle可以提供满足StreamHandleService类型要求的自定义类型,并实现async_read_some方面async_read_some_at.

/// @brief Service that implements async_read_some with async_read_some_at.
class offset_stream_handle_service
  : public boost::asio::windows::stream_handle_service
{
private:
  // The type of the platform-specific implementation.
  typedef boost::asio::detail::win_iocp_handle_service service_impl_type;
public:

  /// The unique service identifier.
  static boost::asio::io_service::id id;

  /// Construct a new stream handle service for the specified io_service.
  explicit offset_stream_handle_service(boost::asio::io_service& io_service)
    : boost::asio::windows::stream_handle_service(io_service),
      service_impl_(io_service),
      offset_(0)
  {}

  /// Start an asynchronous read.
  template <typename MutableBufferSequence,
            typename ReadHandler>
  void
  async_read_some(
    implementation_type& impl,
    const MutableBufferSequence& buffers,
    ReadHandler handler)
  {
    // Implement async_read_some in terms of async_read_some_at.  The provided
    // ReadHandler will be hoisted in an internal handler so that offset_ can
    // be properly updated.
    service_impl_.async_read_some_at(impl, offset_, buffers, 
      detail::read_some_offset_handler<ReadHandler>(handler, offset_));
  }
private:
  // The platform-specific implementation.
  service_impl_type service_impl_;
  boost::uint64_t offset_;
};

boost::asio::io_service::id offset_stream_handle_service::id;
Run Code Online (Sandbox Code Playgroud)

我在示例代码中选择了简单性,但多个I/O对象将使用相同的服务.因此,offset_stream_handle_service当多个I/O对象使用该服务时,需要管理每个处理程序的偏移量才能正常运行.

要使用适应的类型,请将AsyncReader::input_handle成员变量修改为basic_adapted_stream<boost::asio::windows::random_access_handle>(适应的I/O对象)或boost::asio::windows::basic_stream_handle<offset_stream_handle_service>(适应的服务).


这是基于原始代码的完整示例,仅修改了AsyncReader::input_handler类型:

#include "stdafx.h"

#include <cassert>
#include <iostream>
#include <string>

#include <boost/asio.hpp>
#include <boost/bind.hpp>

#include <Windows.h>
#include <WinBase.h>


namespace detail {
/// @brief Handler to wrap asynchronous read_some_at operations.
template <typename Handler>
class read_some_offset_handler
{
public:
  read_some_offset_handler(Handler handler, boost::uint64_t& offset)
    : handler_(handler),
      offset_(offset)
  {}

  void operator()(
    const boost::system::error_code& error,
    std::size_t bytes_transferred)
  {
    offset_ += bytes_transferred;

    // If bytes were transferred, then set the error code as success.
    // EOF will be detected on next read.  This is to account for
    // the read_until algorithm behavior.
    const boost::system::error_code result_ec =
      (error && bytes_transferred)
      ? make_error_code(boost::system::errc::success) : error;

    handler_(result_ec, bytes_transferred);
  }

//private:
  Handler handler_;
  boost::uint64_t& offset_;
};

/// @brief Hook that allows the wrapped handler to be invoked
///        within specific context.  This is critical to support
///        composed operations being invoked within a strand.
template <typename Function,
          typename Handler>
void asio_handler_invoke(
  Function function,
  detail::read_some_offset_handler<Handler>* handler)
{
  boost_asio_handler_invoke_helpers::invoke(
    function, handler->handler_);
}

} // namespace detail

/// @brief Adapts AsyncRandomAccessReadDevice to support AsyncReadStream.
template <typename AsyncRandomAccessReadDevice>
class basic_adapted_stream
  : public AsyncRandomAccessReadDevice
{
public:
  basic_adapted_stream(
    boost::asio::io_service& io_service,
    HANDLE handle
  )
    : AsyncRandomAccessReadDevice(io_service, handle),
      offset_(0)
  {}

  template<typename MutableBufferSequence,
           typename ReadHandler>
  void async_read_some(
    const MutableBufferSequence& buffers,
    ReadHandler handler)
  {
    async_read_at(*this, offset_, buffers, 
      detail::read_some_offset_handler<ReadHandler>(handler, offset_));
  }

private:
  boost::uint64_t offset_;
};

/// @brief Service that implements async_read_some with async_read_some_at.
class offset_stream_handle_service
  : public boost::asio::windows::stream_handle_service
{
private:
  // The type of the platform-specific implementation.
  typedef boost::asio::detail::win_iocp_handle_service service_impl_type;
public:

  /// The unique service identifier.
  static boost::asio::io_service::id id;

  /// Construct a new stream handle service for the specified io_service.
  explicit offset_stream_handle_service(boost::asio::io_service& io_service)
    : boost::asio::windows::stream_handle_service(io_service),
      service_impl_(io_service),
      offset_(0)
  {}

  /// Start an asynchronous read.
  template <typename MutableBufferSequence,
            typename ReadHandler>
  void
  async_read_some(
    implementation_type& impl,
    const MutableBufferSequence& buffers,
    ReadHandler handler)
  {
    // Implement async_read_some in terms of async_read_some_at.  The provided
    // ReadHandler will be hoisted in an internal handler so that offset_ can
    // be properly updated.
    service_impl_.async_read_some_at(impl, offset_, buffers, 
      detail::read_some_offset_handler<ReadHandler>(handler, offset_));
  }
private:
  // The platform-specific implementation.
  service_impl_type service_impl_;
  boost::uint64_t offset_;
};

boost::asio::io_service::id offset_stream_handle_service::id;

#ifndef ADAPT_IO_SERVICE
typedef basic_adapted_stream<
    boost::asio::windows::random_access_handle> adapted_stream;
#else
typedef boost::asio::windows::basic_stream_handle<
    offset_stream_handle_service> adapted_stream;
#endif

class AsyncReader
{
  public:
    AsyncReader(boost::asio::io_service& io_service, HANDLE handle)
      : io_service_(io_service),
        input_buffer(/*size*/ 8192),
        input_handle(io_service, handle)
    {
        start_read();
    }

    void start_read()
    {
        boost::asio::async_read_until(input_handle, input_buffer, '\n',
            boost::bind(&AsyncReader::handle_read, this,
                boost::asio::placeholders::error,
                boost::asio::placeholders::bytes_transferred));
    }

    void handle_read(const boost::system::error_code& error, std::size_t length);
    // void handle_write(const boost::system::error_code& error);

  private:
    boost::asio::io_service& io_service_;
    boost::asio::streambuf input_buffer;
    adapted_stream input_handle;
};

void AsyncReader::handle_read(const boost::system::error_code& error, std::size_t length)
{
    if (!error)
    {
        static int count = 0;
        ++count;

        // method 1: (same problem)
        // const char* pStart = boost::asio::buffer_cast<const char*>(input_buffer.data());
        // std::string s(pStart, length);
        // input_buffer.consume(length);

        // method 2:
        std::istream is(&input_buffer);
        std::string s;
        assert(std::getline(is, s));

        std::cout << "line #" << count << ", length " << length << ", getline() [" << s.size() << "] '" << s << "'\n";

        start_read();
    }
    else if (error == boost::asio::error::not_found)
        std::cerr << "Did not receive ending character!\n";
    else
        std::cerr << "Misc error during read!\n";
}
int _tmain(int argc, _TCHAR* argv[])
{
    boost::asio::io_service io_service;

    HANDLE handle = ::CreateFile(TEXT("c:/temp/input.txt"),
                                 GENERIC_READ,
                                 0, // share mode
                                 NULL, // security attribute: NULL = default
                                 OPEN_EXISTING, // creation disposition
                                 FILE_FLAG_OVERLAPPED,
                                 NULL // template file
                                );

    AsyncReader obj(io_service, handle);

    io_service.run();

    std::cout << "Normal termination\n";
    getchar();
    return 0;
}
Run Code Online (Sandbox Code Playgroud)

使用原始问题的输入时,会产生以下输出:

line #1, length 70, getline() [69] 'LINE 1 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
line #2, length 70, getline() [69] 'LINE 2 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
line #3, length 70, getline() [69] 'LINE 3 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
line #4, length 70, getline() [69] 'LINE 4 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
line #5, length 70, getline() [69] 'LINE 5 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
line #6, length 70, getline() [69] 'LINE 6 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
line #7, length 70, getline() [69] 'LINE 7 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
line #8, length 70, getline() [69] 'LINE 8 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
line #9, length 70, getline() [69] 'LINE 9 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
line #10, length 70, getline() [69] 'LINE 0 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
line #11, length 70, getline() [69] 'LINE A abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
line #12, length 70, getline() [69] 'LINE B abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
line #13, length 70, getline() [69] 'LINE C abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
line #14, length 70, getline() [69] 'LINE D abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
line #15, length 70, getline() [69] 'LINE E abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
Misc error during read!
Normal termination

我的输入文件\n在LINE F的末尾没有字符.因此,AsyncReader::handle_read()调用时出现错误,boost::asio::error::eof并且其input_buffer内容包含LINE F.修改最后的else情况后打印更多信息:

...
else
{
    std::cerr << "Error: " << error.message() << "\n";

    if (std::size_t buffer_size = input_buffer.size())
    {
        boost::asio::streambuf::const_buffers_type bufs = input_buffer.data();
        std::string contents(boost::asio::buffers_begin(bufs),
                             boost::asio::buffers_begin(bufs) + buffer_size);
        std::cerr << "stream contents: '" << contents << "'\n";
    }
}
Run Code Online (Sandbox Code Playgroud)

我得到以下输出:

line #1, length 70, getline() [69] 'LINE 1 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
line #2, length 70, getline() [69] 'LINE 2 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
line #3, length 70, getline() [69] 'LINE 3 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
line #4, length 70, getline() [69] 'LINE 4 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
line #5, length 70, getline() [69] 'LINE 5 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
line #6, length 70, getline() [69] 'LINE 6 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
line #7, length 70, getline() [69] 'LINE 7 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
line #8, length 70, getline() [69] 'LINE 8 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
line #9, length 70, getline() [69] 'LINE 9 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
line #10, length 70, getline() [69] 'LINE 0 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
line #11, length 70, getline() [69] 'LINE A abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
line #12, length 70, getline() [69] 'LINE B abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
line #13, length 70, getline() [69] 'LINE C abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
line #14, length 70, getline() [69] 'LINE D abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
line #15, length 70, getline() [69] 'LINE E abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
Error: End of file
stream contents: 'LINE F abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
Normal termination