当boost :: asio :: io_service运行方法阻塞/解除阻塞时会感到困惑

Mis*_*tyD 82 c++ boost-asio

作为Boost.Asio的初学者,我很困惑io_service::run().如果有人能在这个方法阻止/解除阻止时向我解释,我将不胜感激.文件指出:

run()函数将阻塞,直到所有工作完成,并且不再需要调度处理程序,或者直到io_service停止处理.

多个线程可以调用该run()函数来设置一个线程池,io_service可以从中执行处理程序.在池中等待的所有线程都是等效的,并且io_service可以选择其中任何一个来调用处理程序.

run()函数的正常退出意味着io_service对象已停止(stopped()函数返回true).后续调用run(),run_one(),poll()poll_one()将除非有预先调用立即返回reset().

以下陈述是什么意思?

[...]不再派遣处理人员[...]


在试图理解行为时io_service::run(),我遇到了这个例子(例3a).在其中,我观察到io_service->run()块和等待工单.

// WorkerThread invines io_service->run()
void WorkerThread(boost::shared_ptr<boost::asio::io_service> io_service);
void CalculateFib(size_t);

boost::shared_ptr<boost::asio::io_service> io_service(
    new boost::asio::io_service);
boost::shared_ptr<boost::asio::io_service::work> work(
   new boost::asio::io_service::work(*io_service));

// ...

boost::thread_group worker_threads;
for(int x = 0; x < 2; ++x)
{
  worker_threads.create_thread(boost::bind(&WorkerThread, io_service));
}

io_service->post( boost::bind(CalculateFib, 3));
io_service->post( boost::bind(CalculateFib, 4));
io_service->post( boost::bind(CalculateFib, 5));

work.reset();
worker_threads.join_all();
Run Code Online (Sandbox Code Playgroud)

但是,在我正在处理的以下代码中,客户端使用TCP/IP连接并且run方法阻塞,直到异步接收数据.

typedef boost::asio::ip::tcp tcp;
boost::shared_ptr<boost::asio::io_service> io_service(
    new boost::asio::io_service);
boost::shared_ptr<tcp::socket> socket(new tcp::socket(*io_service));

// Connect to 127.0.0.1:9100.
tcp::resolver resolver(*io_service);
tcp::resolver::query query("127.0.0.1", 
                           boost::lexical_cast< std::string >(9100));
tcp::resolver::iterator endpoint_iterator = resolver.resolve(query);
socket->connect(endpoint_iterator->endpoint());

// Just blocks here until a message is received.
socket->async_receive(boost::asio::buffer(buf_client, 3000), 0,
                      ClientReceiveEvent);
io_service->run();

// Write response.
boost::system::error_code ignored_error;
std::cout << "Sending message \n";
boost::asio::write(*socket, boost::asio::buffer("some data"), ignored_error);
Run Code Online (Sandbox Code Playgroud)

run()对此的任何解释都将在以下两个示例中描述其行为.

Tan*_*ury 219

基础

让我们从一个简化的例子开始,检查相关的Boost.Asio部分:

void handle_async_receive(...) { ... }
void print() { ... }

...  

boost::asio::io_service io_service;
boost::asio::ip::tcp::socket socket(io_service);

...

io_service.post(&print);                             // 1
socket.connect(endpoint);                            // 2
socket.async_receive(buffer, &handle_async_receive); // 3
io_service.post(&print);                             // 4
io_service.run();                                    // 5
Run Code Online (Sandbox Code Playgroud)

什么是处理程序

一个处理程序只不过是一个回调多.在示例代码中,有3个处理程序:

  • print处理程序(1).
  • handle_async_receive处理程序(3).
  • print处理程序(4).

尽管print()使用了两次相同的函数,但每次使用都被认为是创建了自己唯一可识别的处理程序.处理程序可以有多种形状和大小,从上面的基本功能到更复杂的构造,例如从boost::bind()lambda和lambdas 生成的函数.无论复杂程度如何,处理程序仍然只是一个回调.

什么工作

工作是代表应用程序代码请求Boost.Asio进行的一些处理.有时Boost.Asio可能会在它被告知之后立即开始一些工作,有时它可能会等待稍后的工作.完成工作后,Boost.Asio将通过调用提供的处理程序来通知应用程序.

Boost.Asio的保证了处理器将只当前调用线程中运行run(),run_one(),poll(),或poll_one().这些线程将起作用并调用处理程序.因此,在上面的例子中,print()当它被发布到io_service(1)中时不被调用.相反,它被添加到io_service并将在稍后的时间点调用.在这种情况下,它在io_service.run()(5)内.

什么是异步操作?

一个异步操作创建工作,Boost.Asio的将调用处理程序通知应用程序时的工作已经完成.通过调用具有前缀名称的函数来创建异步操作async_.这些功能也称为启动功能.

异步操作可以分解为三个独特的步骤:

  • 启动或通知相关的io_service工作需要完成.该async_receive操作(3)通知io_service,它需要从套接字异步读取数据,然后async_receive立即返回.
  • 做实际的工作.在这种情况下,当socket接收数据时,将读取并复制字节buffer.实际工作将在以下任何一项中完成:
    • 启动函数(3),如果Boost.Asio可以确定它不会阻塞.
    • 当应用程序显式运行io_service(5)时.
  • 调用handle_async_receive ReadHandler.再一次,处理程序只在运行它的线程中调用io_service.因此,无论何时完成工作(3或5),都保证handle_async_receive()只在io_service.run()(5)内调用.

这三个步骤之间的时间和空间分离称为控制流反转.这是使异步编程变得困难的复杂性之一.但是,有一些技术可以帮助缓解这种情况,例如使用协同程序.

怎么io_service.run()办?

当一个线程调用时io_service.run(),将从该线程中调用工作和处理程序.在上面的例子中,io_service.run()(5)将阻塞,直到:

  • 它已从两个print处理程序调用并返回,接收操作以成功或失败完成,并且handle_async_receive已调用并返回其处理程序.
  • 通过io_service明确停止io_service::stop().
  • 从处理程序中抛出异常.

一个潜在的伪造流程可以描述如下:

create io_service
create socket
add print handler to io_service (1)
wait for socket to connect (2)
add an asynchronous read work request to the io_service (3)
add print handler to io_service (4)
run the io_service (5)
  is there work or handlers?
    yes, there is 1 work and 2 handlers
      does socket have data? no, do nothing
      run print handler (1)
  is there work or handlers?
    yes, there is 1 work and 1 handler
      does socket have data? no, do nothing
      run print handler (4)
  is there work or handlers?
    yes, there is 1 work
      does socket have data? no, continue waiting
  -- socket receives data --
      socket has data, read it into buffer
      add handle_async_receive handler to io_service
  is there work or handlers?
    yes, there is 1 handler
      run handle_async_receive handler (3)
  is there work or handlers?
    no, set io_service as stopped and return

请注意如何当读完成后,它增加了一个处理程序io_service.这个细微的细节是异步编程的一个重要特征.它允许将处理程序链接在一起.例如,如果handle_async_receive没有得到它预期的所有数据,那么它的实现可以发布另一个异步读操作,导致io_service有更多的工作,因此不会返回io_service.run().

请注意,当io_service已经用完工作时,应用程序必须reset()io_service再次运行它之前.


示例问题和示例3a代码

现在,让我们检查一下问题中引用的两段代码.

问题代码

socket->async_receive增加了工作io_service.因此,io_service->run()将阻塞直到读取操作以成功或错误完成,并且ClientReceiveEvent已完成运行或抛出异常.

例3a代码

为了使其更容易理解,这里有一个较小的注释示例3a:

void CalculateFib(std::size_t n);

int main()
{
  boost::asio::io_service io_service;
  boost::optional<boost::asio::io_service::work> work =       // '. 1
      boost::in_place(boost::ref(io_service));                // .'

  boost::thread_group worker_threads;                         // -.
  for(int x = 0; x < 2; ++x)                                  //   :
  {                                                           //   '.
    worker_threads.create_thread(                             //     :- 2
      boost::bind(&boost::asio::io_service::run, &io_service) //   .'
    );                                                        //   :
  }                                                           // -'

  io_service.post(boost::bind(CalculateFib, 3));              // '.
  io_service.post(boost::bind(CalculateFib, 4));              //   :- 3
  io_service.post(boost::bind(CalculateFib, 5));              // .'

  work = boost::none;                                         // 4
  worker_threads.join_all();                                  // 5
}
Run Code Online (Sandbox Code Playgroud)

在高级别,程序将创建2个线程来处理io_service事件循环(2).这导致一个简单的线程池,它将计算Fibonacci数(3).

问题代码和此代码之间的一个主要区别是,此代码实际工作和处理程序添加到(3)之前调用io_service::run()(2 ).为了防止立即返回,创建了一个对象(1).这个目的可以防止工作失败; 因此,不会因为没有工作而返回.io_serviceio_service::run()io_service::workio_serviceio_service::run()

整体流程如下:

  1. 创建并添加添加到的io_service::work对象io_service.
  2. 创建的线程池调用io_service::run().io_service由于该io_service::work对象,这些工作线程不会返回.
  3. 添加3个计算斐波纳契数的处理程序io_service,并立即返回.工作线程,而不是主线程,可能会立即开始运行这些处理程序.
  4. 删除io_service::work对象.
  5. 等待工作线程完成运行.这只会在所有3个处理程序完成执行后才会发生,因为它们io_service既没有处理程序也没有工作.

代码可以用与原始代码相同的方式编写,其中处理程序被添加到io_service,然后io_service处理事件循环.这消除了使用的需要io_service::work,并产生以下代码:

int main()
{
  boost::asio::io_service io_service;

  io_service.post(boost::bind(CalculateFib, 3));              // '.
  io_service.post(boost::bind(CalculateFib, 4));              //   :- 3
  io_service.post(boost::bind(CalculateFib, 5));              // .'

  boost::thread_group worker_threads;                         // -.
  for(int x = 0; x < 2; ++x)                                  //   :
  {                                                           //   '.
    worker_threads.create_thread(                             //     :- 2
      boost::bind(&boost::asio::io_service::run, &io_service) //   .'
    );                                                        //   :
  }                                                           // -'
  worker_threads.join_all();                                  // 5
}
Run Code Online (Sandbox Code Playgroud)

同步与异步

虽然问题中的代码使用的是异步操作,但它正在有效地同步运行,因为它正在等待异步操作完成:

socket.async_receive(buffer, handler)
io_service.run();
Run Code Online (Sandbox Code Playgroud)

相当于:

boost::asio::error_code error;
std::size_t bytes_transferred = socket.receive(buffer, 0, error);
handler(error, bytes_transferred);
Run Code Online (Sandbox Code Playgroud)

作为一般经验法则,尽量避免混合同步和异步操作.通常,它可以将复杂的系统变成复杂的系统.这个答案突出了异步编程的优点,其中一些也包含在Boost.Asio 文档中.

  • 很棒的帖子.我想添加一件事,因为我觉得它没有得到足够的重视:在run()返回后,你需要在你的io_service上调用reset(),然后才能再次运行它.否则,无论是否等待异步操作,它都可以立即返回. (13认同)

Log*_*orn 18

为了简化run做法,将其视为必须处理一堆纸的员工; 它只需要一张纸,就可以完成纸张所说的内容,将纸张扔掉并取出下一页; 当他用完床单时,就会离开办公室.在每张纸上都可以有任何类型的指令,甚至可以添加新的纸张.回到ASIO:你可以给一个io_service使用:工作有两种方式,本质上post,或者通过使用内部调用其他对象就可以了,你挂在样品中postio_service一样,socket和它的async_*方法.