如何处理读入ASIO streambuf的多余字符?

Arp*_*ius 4 c++ boost boost-asio c++11

大多数库的解析器只能在一个std::istream或单个连续缓冲区上运行。这些解析器读取istream直到eof结束,而不是文档结束。即使有一个boost::asio::streambuf可以使用的istream,也存在读取和提交一帧图像的问题。诸如此类read_until的函数将提交所读取的任何内容,并且如果它们读取下一帧的片段,则解析填充将失败。

这个嘲笑example on Coliru正说明问题所在。

假设我们需要一个有效的解决方案,而无需复制缓冲区,那么我需要确保流的末尾是文档的正确末尾。我当前的解决方案是扫描数据,并在一个准备好的缓冲区上进行提交/消耗乘以:

size_t read_some_frames( boost::asio::streambuf& strbuf, 
                         std::function< void(istream&) > parser ) {
        auto buffers= strbuf.prepare( 1024 );
        size_t read= bad_case_of_read_some( buffers );

        vector< std::pair< size_t, size_t > > frames;
        std::pair< size_t, size_t > leftover= scanForFrames( 
                    buffers_begin(buffers), 
                    buffers_begin(buffers)+read, 
                    frames, '\0' );

        for( auto const& frame: frames ) {
            cout << "Frame size: " << frame.first 
                      << " skip: " << frame.second << endl;
            strbuf.commit( frame.first );
            strbuf.consume( frame.second );
            iostream stream( &strbuf );
            parser( stream );
        }
        cout << "Unfinished frame size: " << leftover.first 
                             << " skip:" << leftover.second << endl;
        strbuf.commit( leftover.first );
        strbuf.consume( leftover.second );
        return read;
}
Run Code Online (Sandbox Code Playgroud)

Live on Coliru

根据文档,这是错误的。我认为这段代码行得通,因为调用commit和消耗不会释放内部缓冲区。我需要以某种方式处理这个问题。

有什么可能的解决方案?

Tan*_*ury 5

read_until()操作将所有读取的数据提交到streambuf的输入序列中时,它们将返回一个bytes_transferred值,该值包含直到第一个定界符(包括第一个定界符)的字节数。从本质上讲,它提供了帧的大小,并且可以通过以下两种方式限制istream只读取streambuf输入序列的一部分:

  • 使用istream限制从streambuf读取的字节数的自定义。一种简单的方法是使用Boost.IOStream boost::iostreams::stream并实现Source概念的模型。
  • 创建一个streambuf衍生自Boost.Asio的自定义streambuf。为了限制从可用输入序列中读取的字节数,自定义函数将需要操纵输入序列的末尾。此外,该风俗streambuf将需要处理下溢。

自定义SourceBoost.IOStream

Boost.IOStream的boost::iostreams::stream对象将I / O操作委托给设备。设备是实现各种Boost.IOStream概念模型的用户代码。在这种情况下,仅需要一个对一个字符序列提供读访问权限的Source概念。此外,当boost::iostreams::stream使用源设备时,它将从中继承std::basic_istream

在以下代码中,asio_streambuf_input_device是从Boost.Asio streambuf中读取的Source概念的模型。读取给定数量的字节后,asio_streambuf_input_device即使基础streambuf仍在其输入序列中包含数据,也指示下溢。

/// Type that implements a model of the Boost.IOStream's Source concept
/// for reading data from a Boost.Asio streambuf
class asio_streambuf_input_device
  : public boost::iostreams::source // Use convenience class.
{
public:

  explicit
  asio_streambuf_input_device(
      boost::asio::streambuf& streambuf,
      std::streamsize bytes_transferred
  )
    : streambuf_(streambuf),
      bytes_remaining_(bytes_transferred)
  {}

  std::streamsize read(char_type* buffer, std::streamsize buffer_size)
  {
    // Determine max amount of bytes to copy.
    auto bytes_to_copy =
      std::min(bytes_remaining_, std::min(
          static_cast<std::streamsize>(streambuf_.size()), buffer_size));

    // If there is no more data to be read, indicate end-of-sequence per
    // Source concept.
    if (0 == bytes_to_copy)
    {
      return -1; // Indicate end-of-sequence, per Source concept.
    }

    // Copy from the streambuf into the provided buffer.
    std::copy_n(buffers_begin(streambuf_.data()), bytes_to_copy, buffer);

    // Update bytes remaining.
    bytes_remaining_ -= bytes_to_copy;

    // Consume from the streambuf.
    streambuf_.consume(bytes_to_copy);

    return bytes_to_copy;
  }

private:
  boost::asio::streambuf& streambuf_;
  std::streamsize bytes_remaining_;
};

// ...

// Create a custom iostream that sets a limit on the amount of bytes
// that will be read from the streambuf.
boost::iostreams::stream<asio_streambuf_input_device> input(streambuf, n);
parse(input);
Run Code Online (Sandbox Code Playgroud)

这是演示此方法的完整示例:

#include <functional>
#include <iostream>
#include <string>

#include <boost/asio.hpp>
#include <boost/iostreams/concepts.hpp>  // boost::iostreams::source
#include <boost/iostreams/stream.hpp>

/// Type that implements a model of the Boost.IOStream's Source concept
/// for reading data from a Boost.Asio streambuf
class asio_streambuf_input_device
  : public boost::iostreams::source // Use convenience class.
{
public:

  explicit
  asio_streambuf_input_device(
      boost::asio::streambuf& streambuf,
      std::streamsize bytes_transferred
  )
    : streambuf_(streambuf),
      bytes_remaining_(bytes_transferred)
  {}

  std::streamsize read(char_type* buffer, std::streamsize buffer_size)
  {
    // Determine max amount of bytes to copy.
    auto bytes_to_copy =
      std::min(bytes_remaining_, std::min(
          static_cast<std::streamsize>(streambuf_.size()), buffer_size));

    // If there is no more data to be read, indicate end-of-sequence per
    // Source concept.
    if (0 == bytes_to_copy)
    {
      return -1; // Indicate end-of-sequence, per Source concept.
    }

    // Copy from the streambuf into the provided buffer.
    std::copy_n(buffers_begin(streambuf_.data()), bytes_to_copy, buffer);

    // Update bytes remaining.
    bytes_remaining_ -= bytes_to_copy;

    // Consume from the streambuf.
    streambuf_.consume(bytes_to_copy);

    return bytes_to_copy;
  }

private:
  boost::asio::streambuf& streambuf_;
  std::streamsize bytes_remaining_;
};

/// @brief Convert a streambuf to a string.
std::string make_string(boost::asio::streambuf& streambuf)
{
  return std::string(buffers_begin(streambuf.data()),
                     buffers_end(streambuf.data()));
}

// This example is not interested in the handlers, so provide a noop function
// that will be passed to bind to meet the handler concept requirements.
void noop() {}

int main()
{
  using boost::asio::ip::tcp;
  boost::asio::io_service io_service;

  // Create all I/O objects.
  tcp::acceptor acceptor(io_service, tcp::endpoint(tcp::v4(), 0));
  tcp::socket server_socket(io_service);
  tcp::socket client_socket(io_service);

  // Connect client and server sockets.
  acceptor.async_accept(server_socket, std::bind(&noop));
  client_socket.async_connect(acceptor.local_endpoint(), std::bind(&noop));
  io_service.run();

  // Write to client.
  const std::string message =
    "12@"
    "345@";
  write(server_socket, boost::asio::buffer(message));

  boost::asio::streambuf streambuf;

  {
    auto bytes_transferred = read_until(client_socket, streambuf, '@');
    // Verify that the entire message "12@345@" was read into
    // streambuf's input sequence.
    assert(message.size() == streambuf.size());
    std::cout << "streambuf contains: " << make_string(streambuf) <<
                  std::endl;

    // Create a custom iostream that sets a limit on the amount of bytes
    // that will be read from the streambuf.
    boost::iostreams::stream<asio_streambuf_input_device> input(
      streambuf, bytes_transferred);

    int data = 0;
    input >> data; // Consumes "12" from input sequence.
    assert(data == 12);
    std::cout << "Extracted: " << data << std::endl;
    assert(!input.eof());
    input.get(); // Consume "@" from input sequence.
    assert(!input.eof());
    input.get(); // No more data available.
    assert(input.eof());
    std::cout << "istream has reached EOF" << std::endl;
  }
  std::cout << "streambuf contains: " << make_string(streambuf) <<
               std::endl;

  {
    // As the streambuf's input sequence already contains the delimiter,
    // this operation will not actually attempt to read data from the
    // socket.
    auto bytes_transferred = read_until(client_socket, streambuf, '@');

    // Create a custom iostream that sets a limit on the amount of bytes
    // that will be read from the streambuf.
    boost::iostreams::stream<asio_streambuf_input_device> input(
      streambuf, bytes_transferred);

    std::string data;
    getline(input, data, '@'); // Consumes delimiter.
    assert(data == "345");
    std::cout << "Extracted: " << data << std::endl;
    assert(!input.eof());
    input.get(); // Underflow.
    assert(input.eof());
    std::cout << "istream has reached EOF" << std::endl;
  }

  assert(streambuf.size() == 0);
  std::cout << "streambuf is empty" << std::endl;
}
Run Code Online (Sandbox Code Playgroud)

输出:

streambuf contains: 12@345@
Extracted: 12
istream has reached EOF
streambuf contains: 345@
Extracted: 345
istream has reached EOF
streambuf is empty
Run Code Online (Sandbox Code Playgroud)

从获得 boost::asio::streambuf

可以安全地从Boost.Asio派生streambuf并实现自定义行为。在这种情况下,目标是限制istream导致下溢之前可以从输入序列中提取的字节数。这可以通过以下方式完成:

  • 更新streambuf的get-area(输入序列)指针,使其仅包含所需数量的要读取的字节。这可以通过将get-area指针(egptr)的末尾设置为n当前字符get-area指针(gptr)之后的字节来实现。在下面的代码中,我将此称为framing
  • 处理一个underflow()。如果已到达当前的末尾,则返回EOF
/// @brief Type that derives from Boost.Asio streambuf and can frame the
///        input sequence to a portion of the actual input sequence.
template <typename Allocator = std::allocator<char> >
class basic_framed_streambuf
  : public boost::asio::basic_streambuf<Allocator>
{
private:

  typedef boost::asio::basic_streambuf<Allocator> parent_type;

public:

  explicit 
  basic_framed_streambuf(
    std::size_t maximum_size = (std::numeric_limits< std::size_t >::max)(),
    const Allocator& allocator = Allocator()
  )
    : parent_type(maximum_size, allocator),
      egptr_(nullptr)
  {}

  /// @brief Limit the current input sequence to n characters.
  ///
  /// @remark An active frame is invalidated by any member function that
  ///        modifies the input or output sequence.
  void frame(std::streamsize n)
  {
    // Store actual end of input sequence.
    egptr_ = this->egptr();
    // Set the input sequence end to n characters from the current
    // input sequence pointer..
    this->setg(this->eback(), this->gptr(), this->gptr() + n);
  }

  /// @brief Restore the end of the input sequence.
  void unframe()
  {
    // Restore the end of the input sequence.
    this->setg(this->eback(), this->gptr(), this->egptr_);
    egptr_ = nullptr;
  }

protected:

  // When the end of the input sequence has been reached, underflow
  // will be invoked.
  typename parent_type::int_type underflow()
  {
    // If the  streambuf is currently framed, then return eof
    // on underflow.  Otherwise, defer to the parent implementation.
    return egptr_ ? parent_type::traits_type::eof()
                  : parent_type::underflow();
  }

private:
  char* egptr_;
};

// ...

basic_framed_streambuf<> streambuf;
// ....
streambuf.frame(n);
std::istream input(&streambuf);
parse(input);
streambuf.unframe();
Run Code Online (Sandbox Code Playgroud)

这是演示此方法的完整示例:

#include <functional>
#include <iostream>
#include <string>

#include <boost/asio.hpp>

/// @brief Type that derives from Boost.Asio streambuf and can frame the
///        input sequence to a portion of the actual input sequence.
template <typename Allocator = std::allocator<char> >
class basic_framed_streambuf
  : public boost::asio::basic_streambuf<Allocator>
{
private:

  typedef boost::asio::basic_streambuf<Allocator> parent_type;

public:

  explicit 
  basic_framed_streambuf(
    std::size_t maximum_size = (std::numeric_limits< std::size_t >::max)(),
    const Allocator& allocator = Allocator()
  )
    : parent_type(maximum_size, allocator),
      egptr_(nullptr)
  {}

  /// @brief Limit the current input sequence to n characters.
  ///
  /// @remark An active frame is invalidated by any member function that
  ///        modifies the input or output sequence.
  void frame(std::streamsize n)
  {
    // Store actual end of input sequence.
    egptr_ = this->egptr();
    // Set the input sequence end to n characters from the current
    // input sequence pointer..
    this->setg(this->eback(), this->gptr(), this->gptr() + n);
  }

  /// @brief Restore the end of the input sequence.
  void unframe()
  {
    // Restore the end of the input sequence.
    this->setg(this->eback(), this->gptr(), this->egptr_);
    egptr_ = nullptr;
  }

protected:

  // When the end of the input sequence has been reached, underflow
  // will be invoked.
  typename parent_type::int_type underflow()
  {
    // If the  streambuf is currently framed, then return eof
    // on underflow.  Otherwise, defer to the parent implementation.
    return egptr_ ? parent_type::traits_type::eof()
                  : parent_type::underflow();
  }

private:
  char* egptr_;
};

typedef basic_framed_streambuf<> framed_streambuf;

/// @brief RAII type that helps frame a basic_framed_streambuf within a 
///        given scope.
template <typename Streambuf>
class streambuf_frame
{
public:
  explicit streambuf_frame(Streambuf& streambuf, std::streamsize n)
    : streambuf_(streambuf)
  {
    streambuf_.frame(n);
  }

  ~streambuf_frame() { streambuf_.unframe(); }

  streambuf_frame(const streambuf_frame&) = delete;
  streambuf_frame& operator=(const streambuf_frame&) = delete;

private:
  Streambuf& streambuf_;
};

/// @brief Convert a streambuf to a string.
std::string make_string(boost::asio::streambuf& streambuf)
{
  return std::string(buffers_begin(streambuf.data()),
                     buffers_end(streambuf.data()));
}

// This example is not interested in the handlers, so provide a noop function
// that will be passed to bind to meet the handler concept requirements.
void noop() {}

int main()
{
  using boost::asio::ip::tcp;
  boost::asio::io_service io_service;

  // Create all I/O objects.
  tcp::acceptor acceptor(io_service, tcp::endpoint(tcp::v4(), 0));
  tcp::socket server_socket(io_service);
  tcp::socket client_socket(io_service);

  // Connect client and server sockets.
  acceptor.async_accept(server_socket, std::bind(&noop));
  client_socket.async_connect(acceptor.local_endpoint(), std::bind(&noop));
  io_service.run();

  // Write to client.
  const std::string message =
    "12@"
    "345@";
  write(server_socket, boost::asio::buffer(message));

  framed_streambuf streambuf;

  // Demonstrate framing the streambuf's input sequence manually.
  {
    auto bytes_transferred = read_until(client_socket, streambuf, '@');
    // Verify that the entire message "12@345@" was read into
    // streambuf's input sequence.
    assert(message.size() == streambuf.size());
    std::cout << "streambuf contains: " << make_string(streambuf) <<
                  std::endl;

    // Frame the streambuf based on bytes_transferred.  This is all data
    // up to and including the first delimiter.
    streambuf.frame(bytes_transferred);

    // Use an istream to read data from the currently framed streambuf.
    std::istream input(&streambuf);
    int data = 0;
    input >> data; // Consumes "12" from input sequence.
    assert(data == 12);
    std::cout << "Extracted: " << data << std::endl;
    assert(!input.eof());
    input.get(); // Consume "@" from input sequence.
    assert(!input.eof());
    input.get(); // No more data available in the frame, so underflow.
    assert(input.eof());
    std::cout << "istream has reached EOF" << std::endl;

    // Restore the streambuf.
    streambuf.unframe();
  }

  // Demonstrate using an RAII helper to frame the streambuf's input
  // sequence.
  {
    // As the streambuf's input sequence already contains the delimiter,
    // this operation will not actually attempt to read data from the
    // socket.
    auto bytes_transferred = read_until(client_socket, streambuf, '@');
    std::cout << "streambuf contains: " << make_string(streambuf) <<
                  std::endl;

    // Frame the streambuf based on bytes_transferred.  This is all data
    // up to and including the first delimiter.  Use a frame RAII object
    // to only frame the streambuf within the current scope.
    streambuf_frame<framed_streambuf> frame(streambuf, bytes_transferred);

    // Use an istream to read data from the currently framed streambuf.
    std::istream input(&streambuf);
    std::string data;
    getline(input, data, '@'); // Consumes delimiter.
    assert(data == "345");
    std::cout << "Extracted: " << data << std::endl;
    assert(!input.eof());
    input.get(); // No more data available in the frame, so underflow.
    assert(input.eof());
    std::cout << "istream has reached EOF" << std::endl;
    // The frame object's destructor will unframe the streambuf.
  }

  assert(streambuf.size() == 0);
  std::cout << "streambuf is empty" << std::endl;
}
Run Code Online (Sandbox Code Playgroud)

输出:

streambuf contains: 12@345@
Extracted: 12
istream has reached EOF
streambuf contains: 345@
Extracted: 345
istream has reached EOF
streambuf is empty
Run Code Online (Sandbox Code Playgroud)