从另一个线程恢复asio协同程序

bob*_*eff 4 c++ multithreading boost-asio

我有从另一个线程恢复boost :: asio coroutine 的问题.这是示例代码:

#include <iostream>
#include <thread>

#include <boost/asio.hpp>
#include <boost/asio/steady_timer.hpp>
#include <boost/asio/spawn.hpp>

using namespace std;
using namespace boost;

void foo(asio::steady_timer& timer, asio::yield_context yield)
{
    cout << "Enter foo" << endl;
    timer.expires_from_now(asio::steady_timer::clock_type::duration::max());
    timer.async_wait(yield);
    cout << "Leave foo" << endl;
}

void bar(asio::steady_timer& timer)
{
    cout << "Enter bar" << endl;
    sleep(1); // wait a little for asio::io_service::run to be executed
    timer.cancel();
    cout << "Leave bar" << endl;
}

int main()
{
    asio::io_service ioService;
    asio::steady_timer timer(ioService);

    asio::spawn(ioService, bind(foo, std::ref(timer), placeholders::_1));

    thread t(bar, std::ref(timer));

    ioService.run();
    t.join();

    return 0;
}
Run Code Online (Sandbox Code Playgroud)

问题是asio :: steady_timer对象不是线程安全的,程序崩溃了.但是,如果我尝试使用互斥锁来同步对它的访问,那么我就会遇到死锁,因为foo的范围没有留下.

#include <iostream>
#include <thread>
#include <mutex>

#include <boost/asio.hpp>
#include <boost/asio/steady_timer.hpp>
#include <boost/asio/spawn.hpp>

using namespace std;
using namespace boost;

void foo(asio::steady_timer& timer, mutex& mtx, asio::yield_context yield)
{
    cout << "Enter foo" << endl;

    {
        lock_guard<mutex> lock(mtx);
        timer.expires_from_now(
            asio::steady_timer::clock_type::duration::max());
        timer.async_wait(yield);
    }

    cout << "Leave foo" << endl;
}

void bar(asio::steady_timer& timer, mutex& mtx)
{
    cout << "Enter bar" << endl;
    sleep(1); // wait a little for asio::io_service::run to be executed

    {
        lock_guard<mutex> lock(mtx);
        timer.cancel();
    }

    cout << "Leave bar" << endl;
}

int main()
{
    asio::io_service ioService;
    asio::steady_timer timer(ioService);
    mutex mtx;

    asio::spawn(ioService, bind(foo, std::ref(timer), std::ref(mtx),
        placeholders::_1));

    thread t(bar, std::ref(timer), std::ref(mtx));

    ioService.run();
    t.join();

    return 0;
}
Run Code Online (Sandbox Code Playgroud)

如果我使用标准的完成处理程序而不是协同程序,就没有这样的问题.

#include <iostream>
#include <thread>
#include <mutex>

#include <boost/asio.hpp>
#include <boost/asio/steady_timer.hpp>

using namespace std;
using namespace boost;

void baz(system::error_code ec)
{
    cout << "Baz: " << ec.message() << endl;
}

void foo(asio::steady_timer& timer, mutex& mtx)
{
    cout << "Enter foo" << endl;
    {
        lock_guard<mutex> lock(mtx);
        timer.expires_from_now(
            asio::steady_timer::clock_type::duration::max());
        timer.async_wait(baz);
    }
    cout << "Leave foo" << endl;
}

void bar(asio::steady_timer& timer, mutex& mtx)
{
    cout << "Enter bar" << endl;
    sleep(1); // wait a little for asio::io_service::run to be executed
    {
        lock_guard<mutex> lock(mtx);
        timer.cancel();
    }
    cout << "Leave bar" << endl;
}

int main()
{
    asio::io_service ioService;
    asio::steady_timer timer(ioService);
    mutex mtx;

    foo(std::ref(timer), std::ref(mtx));

    thread t(bar, std::ref(timer), std::ref(mtx));

    ioService.run();
    t.join();

    return 0;
}
Run Code Online (Sandbox Code Playgroud)

当使用couroutines时,是否可能具有与上一个示例类似的行为.

Tan*_*ury 6

一个协程在a的上下文中运行strand.在spawn(),如果没有明确提供,strand将为协程创建一个新的.通过显式地提供strandspawn(),一个可以发布工作纳入strand将与协程同步.

此外,如上所述,如果协程在一个线程中运行,获取互斥锁,然后挂起,但在另一个线程中恢复并运行并释放锁,则可能会发生未定义的行为.为了避免这种情况,理想情况下,当协程暂停时,不应该持有锁.但是,如果有必要,必须保证协程在恢复时在同一个线程内运行,例如只运行io_service单个线程.


这是基于原始示例的最小完整示例,其中bar()帖子工作到strand取消计时器,导致foo()协程恢复:

#include <iostream>
#include <thread>

#include <boost/asio.hpp>
#include <boost/asio/spawn.hpp>
#include <boost/asio/steady_timer.hpp>

void foo(boost::asio::steady_timer& timer, boost::asio::yield_context yield)
{
  std::cout << "Enter foo" << std::endl;

  timer.expires_from_now(
      boost::asio::steady_timer::clock_type::duration::max());
  boost::system::error_code error;
  timer.async_wait(yield[error]);
  std::cout << "foo error: " << error.message() << std::endl;

  std::cout << "Leave foo" << std::endl;
}

void bar(
  boost::asio::io_service::strand& strand,
  boost::asio::steady_timer& timer
)
{
  std::cout << "Enter bar" << std::endl;

  // Wait a little for asio::io_service::run to be executed
  std::this_thread::sleep_for(std::chrono::seconds(1));
  // Post timer cancellation into the strand.
  strand.post([&timer]()
    {
      timer.cancel();
    });

  std::cout << "Leave bar" << std::endl;
}

int main()
{
  boost::asio::io_service io_service;
  boost::asio::steady_timer timer(io_service);
  boost::asio::io_service::strand strand(io_service);

  // Use an explicit strand, rather than having the io_service create.
  boost::asio::spawn(strand, std::bind(&foo, 
      std::ref(timer), std::placeholders::_1));

  // Pass the same strand to the thread, so that the thread may post
  // handlers synchronized with the foo coroutine.
  std::thread t(&bar, std::ref(strand), std::ref(timer));

  io_service.run();
  t.join();
}
Run Code Online (Sandbox Code Playgroud)

其中提供以下输出:

Enter foo
Enter bar
foo error: Operation canceled
Leave foo
Leave bar
Run Code Online (Sandbox Code Playgroud)

答案所述,当boost::asio::yield_context检测到异步操作失败时,例如取消操作时,它会转换boost::system::error_codesystem_error异常并抛出.上面的例子用于yield_context::operator[]允许yield_context填充error_code失败时提供的而不是抛出投掷.