如何在其链的上下文中恢复执行堆栈协程?

upd*_*liu 4 c++ boost boost-asio c++11

using Yield = asio::yield_context;
using boost::system::error_code;
int Func(Yield yield) {
  error_code ec;
  asio::detail::async_result_init<Yield, void(error_code, int)> init(yield[ec]);
  std::thread th(std::bind(Process, init.handler));
  int result = init.result.get();  // <--- yield at here
  return result;
}
Run Code Online (Sandbox Code Playgroud)

如何实现,Process以便FuncFunc最初产生的链的上下文中恢复?

Tan*_*ury 9

Boost.Asio使用辅助函数,asio_handler_invoke为调用策略提供自定义点.例如,当一个Handler被a包装时strand,调用策略将导致通过strandon调用分派处理程序.如文档中所述,asio_handler_invoke应通过依赖于参数的查找来调用.

using boost::asio::asio_handler_invoke;
asio_handler_invoke(nullary_functor, &handler);
Run Code Online (Sandbox Code Playgroud)

对于堆栈协程,在产生协程时以及在调用handler_type与a相关联yield_context以恢复协程时,需要考虑各种重要细节:

  • 如果代码当前正在协同程序中运行,那么它strand与协同程序相关联.本质上,一个简单的处理程序由strand恢复协同程序包装,导致执行跳转到协同程序,阻止当前处理程序中的处理程序strand.当协同程序产生时,执行会跳回到strand处理程序,允许它完成.
  • 虽然spawn()添加工作io_service(将启动并跳转到协程的处理程序),但协程本身不起作用.为了防止io_service事件循环在协程未完成时结束,可能需要将工作添加到io_servicebefore yielding之前.
  • 在调用resume之前,堆栈协程使用a strand来帮助保证协程的产量.Asio 1.10.6/Boost 1.58能够从启动函数中安全地调用完成处理程序.先前版本要求未在启动函数内调用完成处理程序,因为其调用策略会导致协程在被挂起之前尝试恢复.dispatch()

以下是一个完整的示例,说明了这些细节:

#include <iostream>    // std::cout, std::endl
#include <chrono>      // std::chrono::seconds
#include <functional>  // std::bind
#include <thread>      // std::thread
#include <utility>     // std::forward
#include <boost/asio.hpp>
#include <boost/asio/spawn.hpp>

template <typename CompletionToken, typename Signature>
using handler_type_t = typename boost::asio::handler_type<
  CompletionToken, Signature>::type;

template <typename Handler>
using async_result = boost::asio::async_result<Handler>;

/// @brief Helper type used to initialize the asnyc_result with the handler.
template <typename CompletionToken, typename Signature>
struct async_completion
{
  typedef handler_type_t<CompletionToken, Signature> handler_type;

  async_completion(CompletionToken&& token)
    : handler(std::forward<CompletionToken>(token)),
      result(handler)
  {}

  handler_type handler;
  async_result<handler_type> result;
};

template <typename Signature, typename CompletionToken>
typename async_result<
  handler_type_t<CompletionToken, Signature>
>::type
async_func(CompletionToken&& token, boost::asio::io_service& io_service)
{
  // The coroutine itself is not work, so guarantee the io_service has
  // work.
  boost::asio::io_service::work work(io_service);

  // Initialize the async completion handler and result.
  async_completion<CompletionToken, Signature> completion(
      std::forward<CompletionToken>(token));

  auto handler = completion.handler;
  std::cout << "Spawning thread" << std::endl;
  std::thread([](decltype(handler) handler)
    {
      // The handler will be dispatched to the coroutine's strand.
      // As this thread is not running within the strand, the handler
      // will actually be posted, guaranteeing that yield will occur
      // before the resume.
      std::cout << "Resume coroutine" << std::endl;
      using boost::asio::asio_handler_invoke;
      asio_handler_invoke(std::bind(handler, 42), &handler);
    }, handler).detach();

  // Demonstrate that the handler is serialized through the strand by
  // allowing the thread to run before suspending this coroutine.
  std::this_thread::sleep_for(std::chrono::seconds(2));

  // Yield the coroutine.  When this yields, execution transfers back to
  // a handler that is currently in the strand.  The handler will complete
  // allowing other handlers that have been posted to the strand to run.
  std::cout << "Suspend coroutine" << std::endl;
  return completion.result.get();
}

int main()
{
  boost::asio::io_service io_service;

  boost::asio::spawn(io_service,
    [&io_service](boost::asio::yield_context yield)
    {
      auto result = async_func<void(int)>(yield, io_service);
      std::cout << "Got: " << result << std::endl;
    });

  std::cout << "Running" << std::endl;
  io_service.run();
  std::cout << "Finish" << std::endl;
}
Run Code Online (Sandbox Code Playgroud)

输出:

Running
Spawning thread
Resume coroutine
Suspend coroutine
Got: 42
Finish
Run Code Online (Sandbox Code Playgroud)

有关更多详细信息,请考虑阅读异步操作的库基础.它提供了更多的细节到异步操作的成分,如何Signature影响async_result,以及整体设计async_result,handler_typeasync_completion.

  • @updogliu关键在于强调协程不被认为是有效的.调用者可能不知道协同程序函数的实现,但可能期望`io_service`在协程仍然存活时不会失去工作. (2认同)
  • @updogliu它应该是固定的.当lambda捕获`handler`时,ADL没有选择正确的`asio_handler_invoke()`钩子.这导致处理程序在strand上下文之外运行.选择了正确的路径后,我还能够将`work`对象迁移到`async_func()`,这在我看来有点清晰. (2认同)

小智 5

这是基于 Tanner 的精彩回答的 Boost 1.66.0 的更新示例:

#include <iostream>    // std::cout, std::endl
#include <chrono>      // std::chrono::seconds
#include <functional>  // std::bind
#include <thread>      // std::thread
#include <utility>     // std::forward
#include <boost/asio.hpp>
#include <boost/asio/spawn.hpp>

template <typename Signature, typename CompletionToken>
auto async_add_one(CompletionToken token, int value) {
    // Initialize the async completion handler and result
    // Careful to make sure token is a copy, as completion's handler takes a reference
    using completion_type = boost::asio::async_completion<CompletionToken, Signature>;
    completion_type completion{ token };

    std::cout << "Spawning thread" << std::endl;
    std::thread([handler = completion.completion_handler, value]() {
        // The handler will be dispatched to the coroutine's strand.
        // As this thread is not running within the strand, the handler
        // will actually be posted, guaranteeing that yield will occur
        // before the resume.
        std::cout << "Resume coroutine" << std::endl;

        // separate using statement is important
        // as asio_handler_invoke is overloaded based on handler's type
        using boost::asio::asio_handler_invoke;
        asio_handler_invoke(std::bind(handler, value + 1), &handler);
    }).detach();

    // Demonstrate that the handler is serialized through the strand by
    // allowing the thread to run before suspending this coroutine.
    std::this_thread::sleep_for(std::chrono::seconds(2));

    // Yield the coroutine.  When this yields, execution transfers back to
    // a handler that is currently in the strand.  The handler will complete
    // allowing other handlers that have been posted to the strand to run.
    std::cout << "Suspend coroutine" << std::endl;
    return completion.result.get();
}

int main() {
    boost::asio::io_context io_context;

    boost::asio::spawn(
        io_context,
        [&io_context](boost::asio::yield_context yield) {
            // Here is your coroutine

            // The coroutine itself is not work, so guarantee the io_context
            // has work while the coroutine is running
            const auto work = boost::asio::make_work_guard(io_context);

            // add one to zero
            const auto result = async_add_one<void(int)>(yield, 0);
            std::cout << "Got: " << result << std::endl; // Got: 1

            // add one to one forty one
            const auto result2 = async_add_one<void(int)>(yield, 41);
            std::cout << "Got: " << result2 << std::endl; // Got: 42
        }
    );

    std::cout << "Running" << std::endl;
    io_context.run();
    std::cout << "Finish" << std::endl;
}
Run Code Online (Sandbox Code Playgroud)

输出:

Running
Spawning thread
Resume coroutine
Suspend coroutine
Got: 1
Spawning thread
Resume coroutine
Suspend coroutine
Got: 42
Finish
Run Code Online (Sandbox Code Playgroud)

评论:

  • 极大地利用了 Tanner 的回答
  • 首选网络 TS 命名(例如,io_context)
  • boost::asio 提供了一个 async_completion 类,它封装了处理程序和 async_result。当处理程序引用 CompletionToken 时要小心,这就是现在显式复制令牌的原因。这是因为通过 async_result ( completion.result.get)产生会让关联的 CompletionToken 放弃其底层的强引用。这最终会导致协程意外提前终止。
  • 明确表示单独using boost::asio::asio_handler_invoke声明非常重要。显式调用可以防止调用正确的重载。

——

我还要提一下,我们的应用程序最终有两个 io_context,一个协程可以与之交互。特别是一个用于 I/O 绑定工作的上下文,另一个用于 CPU。使用显式链 withboost::asio::spawn最终为我们提供了对协同程序运行/恢复的上下文的明确定义的控制。这帮助我们避免了偶发的 BOOST_ASSERT(!is_running()) 失败。

使用显式链创建协程:

auto strand = std::make_shared<strand_type>(io_context.get_executor());
boost::asio::spawn(
    *strand,
    [&io_context, strand](yield_context_type yield) {
        // coroutine
    }
);
Run Code Online (Sandbox Code Playgroud)

调用显式调度到链(多 io_context 世界):

boost::asio::dispatch(*strand, [handler = completion.completion_handler, value] {
    using boost::asio::asio_handler_invoke;
    asio_handler_invoke(std::bind(handler, value), &handler);
});
Run Code Online (Sandbox Code Playgroud)

——

我们还发现在 async_result 签名中使用 future 允许在恢复时将异常传播回协程。

using bound_function = void(std::future<RETURN_TYPE>);
using completion_type = boost::asio::async_completion<yield_context_type, bound_function>;
Run Code Online (Sandbox Code Playgroud)

产量为:

auto future = completion.result.get();
return future.get(); // may rethrow exception in your coroutine's context
Run Code Online (Sandbox Code Playgroud)