由于超时而取消async_read

yey*_*man 4 c++ boost-asio

我正在尝试编写一个包装器同步方法,async_read以允许在套接字上进行非阻塞读取.以下几个关于互联网的例子我已经开发了一个似乎几乎正确的解决方案但是没有用.

该类声明了这些相关的属性和方法:

class communications_client
{
    protected:
        boost::shared_ptr<boost::asio::io_service> _io_service;
        boost::shared_ptr<boost::asio::ip::tcp::socket> _socket;
        boost::array<boost::uint8_t, 128> _data;

        boost::mutex _mutex;
        bool _timeout_triggered;
        bool _message_received;
        boost::system::error_code _error;
        size_t _bytes_transferred;

        void handle_read(const boost::system::error_code & error, size_t bytes_transferred);
        void handle_timeout(const boost::system::error_code & error);
        size_t async_read_helper(unsigned short bytes_to_transfer, const boost::posix_time::time_duration & timeout, boost::system::error_code & error);

        ...
}
Run Code Online (Sandbox Code Playgroud)

该方法async_read_helper是封装了所有复杂的一个,而其他两个handle_readhandle_timeout仅仅是事件处理程序.以下是三种方法的实现:

void communications_client::handle_timeout(const boost::system::error_code & error)
{
    if (!error)
    {
        _mutex.lock();
        _timeout_triggered = true;
        _error.assign(boost::system::errc::timed_out, boost::system::system_category());
        _mutex.unlock();
    }
}

void communications_client::handle_read(const boost::system::error_code & error, size_t bytes_transferred)
{
    _mutex.lock();
    _message_received = true;
    _error = error;
    _bytes_transferred = bytes_transferred;
    _mutex.unlock();
}

size_t communications_client::async_read_helper(unsigned short bytes_to_transfer, const boost::posix_time::time_duration & timeout, boost::system::error_code & error)
{
    _timeout_triggered = false;
    _message_received = false;

    boost::asio::deadline_timer timer(*_io_service);
    timer.expires_from_now(timeout);
    timer.async_wait(
        boost::bind(
            &communications_client::handle_timeout,
            this,
            boost::asio::placeholders::error));

    boost::asio::async_read(
        *_socket,
        boost::asio::buffer(_data, 128),
        boost::asio::transfer_exactly(bytes_to_transfer),
        boost::bind(
            &communications_client::handle_read,
            this,
            boost::asio::placeholders::error,
            boost::asio::placeholders::bytes_transferred));

    while (true)
    {
        _io_service->poll_one();
        if (_message_received)
        {
            timer.cancel();
            break;
        }
        else if (_timeout_triggered)
        {
            _socket->cancel();
            break;
        }
    }

    return _bytes_transferred;
}
Run Code Online (Sandbox Code Playgroud)

我的主要问题是:为什么这个循环开启_io_service->poll_one(),没有循环和调用_io_service->run_one()?此外,我想知道对于更习惯使用Boost和Asio的人来说它是否正确.谢谢!


修正提案#1

根据Jonathan Wakely所做的评论,可以在操作完成后使用_io_service->run_one()调用替换循环_io_service->reset().它应该看起来像:

_io_service->run_one();
if (_message_received)
{
    timer.cancel();
}
else if (_timeout_triggered)
{
    _socket->cancel();
}

_io_service->reset();
Run Code Online (Sandbox Code Playgroud)

经过一些测试,我已经检查过这种解决方案是不行的.handle_timeout使用错误代码连续调用该方法operation_aborted.这些电话怎么能停止?

修正提案#2

twsansbury的答案是准确的,并以可靠的文档为基础.该实现导致以下代码async_read_helper:

while (_io_service->run_one())
{
    if (_message_received)
    {
        timer.cancel();
    }
    else if (_timeout_triggered)
    {
        _socket->cancel();
    }
}
_io_service->reset();
Run Code Online (Sandbox Code Playgroud)

以及对handle_read方法的以下更改:

void communications_client::handle_read(const boost::system::error_code & error, size_t bytes_transferred)
{
    if (error != boost::asio::error::operation_aborted)
    {
        ...
    }
}
Run Code Online (Sandbox Code Playgroud)

在测试期间,该解决方案已证明是可靠且正确

Tan*_*ury 7

io_service::run_one()和之间的主要区别io_service::poll_one()run_one()阻塞直到处理程序准备好运行,而poll_one()不会等待任何未完成的处理程序准备就绪.

假设只有优秀的处理器在_io_serviceARE handle_timeout()handle_read(),则run_one()不需要循环,因为它只会返回无论是一次handle_timeout()handle_read()已经跑了.另一方面,poll_one()需要一个循环,因为poll_one()它将立即返回,因为它们既不准备handle_timeout()也不handle_read()准备运行,导致函数最终返回.

原始代码以及修复提议#1的主要问题是,当async_read_helper()返回时,io_service中仍有未完成的处理程序.在下一次调用时async_read_helper(),要调用的下一个处理程序将是前一次调用的处理程序.该io_service::reset()方法仅允许io_service从停止状态恢复运行,它不会删除已经排队到io_service的任何处理程序.要解决此问题,请尝试使用循环来使用io_service中的所有处理程序.消耗掉所有处理程序后,退出循环并重置io_service:

// Consume all handlers.
while (_io_service->run_one())
{
  if (_message_received)
  {
    // Message received, so cancel the timer.  This will force the completion of
    // handle_timer, with boost::asio::error::operation_aborted as the error.
    timer.cancel();
  }
  else if (_timeout_triggered)
  {
    // Timeout occured, so cancel the socket.  This will force the completion of
    // handle_read, with boost::asio::error::operation_aborted as the error.
    _socket->cancel();
  }
}

// Reset service, guaranteeing it is in a good state for subsequent runs.
_io_service->reset();
Run Code Online (Sandbox Code Playgroud)

从调用者的角度来看,这种形式的超时与run_one()块同步.但是,I/O服务中仍在进行工作.另一种方法是使用Boost.Asio 对C++期货支持来等待未来并执行超时.此代码可以更容易阅读,但它需要至少一个其他线程来处理I/O服务,因为等待超时的线程不再处理I/O服务:

// Use an asynchronous operation so that it can be cancelled on timeout.
std::future<std::size_t> read_result = boost::asio::async_read(
    socket, buffer, boost::asio::use_future);

// If timeout occurs, then cancel the operation.
if (read_result.wait_for(std::chrono::seconds(1)) == 
    std::future_status::timeout)
{
  socket.cancel();
}
// Otherwise, the operation completed (with success or error).
else
{
  // If the operation failed, then on_read.get() will throw a
  // boost::system::system_error.
  auto bytes_transferred = read_result.get();
  // process buffer
}
Run Code Online (Sandbox Code Playgroud)