防止一段代码在协程中同时执行

IIn*_*ble 4 c++ visual-c++

我需要保护一段代码免于在协程​​中同时执行。使用std::lock_guard类模板可以简单地防止多线程环境中的并发执行。然而,我的协程是从单个线程调用的,因此该解决方案不适用。

以下是我想要完成的(伪)代码:

future<http_response> send_req_async(http_request req) {
    while (true) {
        // Attempt to send an HTTP request
        auto const& access_token{ token_store::access_token() };
        auto const response{ co_await impl::send_req_async(req, access_token) };
        if (response.status_code() == http_code::ok) {
            co_return response;
        }

        // Attempt to refresh access token
        if (response.status_code() == http_code::unauthorized) {
            // The following scope needs to be guarded against concurrent execution.
            // To guard against concurrent execution from multiple threads I would use:
            // lock_guard<mutex> guard(refresh_token_mutex);
            if (access_token != token_store::access_token()) {
                continue;
            }
            auto const& token{ co_await refresh_token(token_store::refresh_token()) };
            token_store::save_access_token(token);
            // End of section that needs to be guarded.
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

该代码旨在允许并行发出多个请求,同时仅允许尝试刷新过期访问令牌的单个协程调用。理想情况下,解决方案应该在令牌刷新操作正在进行时暂停并发协程调用,然后自动恢复(即与std::lock_guard多线程环境中的语义相同)。

协程机制或 C++ 标准库中是否内置了任何东西,可以让我以干净的方式实现这一点,还是我必须自己推出?


注意:我使用的是 Visual Studio 2017 15.7.2,因此您可以假设完全支持 C++17 及其 Coroutine TS 实现。

IIn*_*ble 5

C++ 或标准库没有提供任何基础设施来获取所需的功能。然而,协程 TS提供了实现co_await异步互斥体的构建块。

总体思路是实现一个可等待的,它在评估表达式时尝试获取合适的互斥体await_suspend。如果无法获取锁,协程将被挂起并添加到等待队列中,否则立即继续执行(保持锁)。

互斥体的unlock方法从队列中恢复等待者,除非等待者队列为空。

网络上有预先构建的解决方案。我之所以选择 Lewis Baker 的async_mutex实施有很多原因:

  • 没有外部或内部依赖。只需将编译单元和头文件放入您的项目中即可完成。
  • 锁由协程拥有,而不是线程拥有。该实现允许协程在不同的线程上恢复。
  • 这是一个无锁的实现。

此实现的使用与 a 的使用非常相似std::lock_guard

#include <cppcoro/async_mutex.hpp>

namespace {
    cppcoro::async_mutex refresh_mutex;
}

future<http_response> send_req_async(http_request req) {
    while (true) {
        // Attempt to send an HTTP request
        auto const& access_token{ token_store::access_token() };
        auto const response{ co_await impl::send_req_async(req, access_token) };
        if (response.status_code() == http_code::ok) {
            co_return response;
        }

        // Attempt to refresh access token
        if (response.status_code() == http_code::unauthorized) {
            // The following scope needs to be guarded against concurrent execution.
            auto const refresh_guard{ co_await refresh_mutex.scoped_lock_async() };
            if (access_token != token_store::access_token()) {
                continue;
            }
            auto const& token{ co_await refresh_token(token_store::refresh_token()) };
            token_store::save_access_token(token);
            // refresh_guard falls out of scope, unlocking the mutex.
            // If there are any suspended coroutines, the oldest one gets resumed.
        }
    }
}
Run Code Online (Sandbox Code Playgroud)