Asio 的 `io_context` 和并发提示

tst*_*ner 8 c++ multithreading boost-asio

构造asio::io_context函数采用可选的并发提示,当只有单个线程与io_contextIO 对象或关联的 IO 对象交互时(或者线程之间的同步已在调用代码中完成),该提示会跳过一些内部锁。

\n

我的理解是,这1将允许我io_context::run()在一个线程中调用并与(即,除 和等io_context之外的所有方法)以及所有关联的 IO 对象正常交互。reset()run()run_one()

\n

另外,使用 时ASIO_CONCURRENCY_HINT_UNSAFE_IO,在 IO 对象(下例中的编号 3)上调用任何 IO 方法都是非法的,并且使用 时调用其io_context本身的任何方法都是非法的ASIO_CONCURRENCY_HINT_UNSAFE。它是否正确?

\n
#include <asio/io_context.hpp>\n#include <asio/ip/tcp.hpp>\n#include <chrono>\n#include <iostream>\n#include <thread>\n\nstatic const char msg[] = "Hello World\\n";\n\nint main() {\n    const auto concurrency_hint = ASIO_CONCURRENCY_HINT_1;\n    asio::io_context ctx{concurrency_hint};\n    asio::ip::tcp::acceptor acc(ctx, asio::ip::tcp::endpoint(asio::ip::address_v4::any(), 7999));\n    acc.listen(2);\n\n    asio::ip::tcp::socket peer(ctx);\n    acc.async_accept(peer, [&peer](const asio::error_code &error) {\n        // call async methods from the thread running the io context (1)\n        peer.async_write_some(\n            asio::const_buffer(msg, 12), [&](const asio::error_code &error, std::size_t len) {\n                peer.close();\n            });\n    });\n\n    std::thread io_thread([&ctx]() { ctx.run(); });\n\n    // call `post()` from another thread (2)\n    asio::post([]() { std::cout << msg << std::flush; });\n\n    // call `async_accept` for an IO object running on another thread (3)\n    acc.async_accept([&](const asio::error_code &error, asio::ip::tcp::socket peer) {\n        peer.close();\n    });\n\n    // call `run()` while another thread is already doing so (4)\n    ctx.run_for(std::chrono::seconds(2));\n\n    std::this_thread::sleep_for(std::chrono::seconds(5));\n    // Call `io_context::stop()` from another thread (5)\n    ctx.stop();\n\n    io_thread.join();\n    return 0;\n}\n
Run Code Online (Sandbox Code Playgroud)\n
\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n \n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n
1UNSAFEUNSAFE_IOSAFE
IO 方法,同一线程 (1)\xe2\x9c\x94\xe2\x9c\x94\xe2\x9c\x94\xe2\x9c\x94
post()来自另一个线程 (2)\xe2\x9c\x94\xe2\x9c\x96\xe2\x9c\x94\xe2\x9c\x94
IO方法,另一个线程(3)\xe2\x9c\x94?\xe2\x9c\x96\xe2\x9c\x96\xe2\x9c\x94
run()来自两个线程 (4)\xe2\x9c\x96\xe2\x9c\x96\xe2\x9c\x96\xe2\x9c\x94
io_context::stop()来自另一个线程 (5)\xe2\x9c\x96\xe2\x9c\x96\xe2\x9c\x94
\n

Tig*_*ire 2

添加了禁用所有同步的功能,以使该库与所有其他“std”库保持一致,其中约定是始终让调用者处理同步。这是针对“std::network”提案的。(如果我没记错的话)。

\n

更有趣的情况是“1”和 BOOST_ASIO_CONCURRENCY_HINT_UNSAFE_IO。

\n

来自https://www.boost.org/doc/libs/1_66_0/doc/html/boost_asio/overview/core/concurrency_hint.html我可以看到值“1”对某些工作队列使用线程本地存储。这意味着它实际上破坏了工作平衡。我认为这意味着当与更多线程一起使用时它将正确运行,但效率较低。\n抱歉,我知道这是软弱/无力的,但手册中并不清楚。该代码肯定将其称为“提示”,这通常意味着它会正确运行,但如果设置错误,则会变慢。

\n

另一个值 BOOST_ASIO_CONCURRENCY_HINT_UNSAFE_IO 与完全锁定相同,除了定义 ASIO_CONCURRENCY_HINT_LOCKING_REACTOR_IO 之外。同样,从手册中并不清楚这实际上意味着什么,而且我找不到任何曾经使用过它的证据。(在我再次检查更高版本的 ASIO 后,我会稍后更新这个答案)。\n但是,从描述来看,它看起来像是一个承诺(不是暗示),没有两个完成处理程序/任何异步操作同时发生。反应堆引擎是说“当完成时执行此操作”的位,因此它将影响新异步回调的注册以及调用后回调的删除。

\n

抱歉,我知道这不是一个好的答案,但我做了一些研究,并且认为我不妨发布它。

\n
\n

编辑:更多信息

\n

我查看了 boost/asio/1.77.0

\n

我的第一个学习点是,并非所有实现都使用无锁(并发)队列。windows版本使用临界区和windows事件...TIL

\n

不过,我确实在代码中找到了相关部分。例如,epoll 版本(见下文)。注意:epoll 不是 Linux io_uring 的默认设置。

\n
epoll_reactor::descriptor_state* epoll_reactor::allocate_descriptor_state()\n{\n  mutex::scoped_lock descriptors_lock(registered_descriptors_mutex_);\n  return registered_descriptors_.alloc(BOOST_ASIO_CONCURRENCY_HINT_IS_LOCKING(\n        REACTOR_IO, scheduler_.concurrency_hint()));\n}\n
Run Code Online (Sandbox Code Playgroud)\n

它的作用是用假互斥体或真实互斥体分配一个“状态”。这是通过 conditional_mutex 类完成的。

\n

另一个有趣的事情是标志与提示相互作用。在线程本地队列优化上切换锁定开关,如下......

\n
\nscheduler::scheduler(boost::asio::execution_context& ctx,\n    int concurrency_hint, bool own_thread, get_task_func_type get_task)\n  : boost::asio::detail::execution_context_service_base<scheduler>(ctx),\n    one_thread_(concurrency_hint == 1\n        || !BOOST_ASIO_CONCURRENCY_HINT_IS_LOCKING(\n          SCHEDULER, concurrency_hint)\n        || !BOOST_ASIO_CONCURRENCY_HINT_IS_LOCKING(\n          REACTOR_IO, concurrency_hint)),\n    mutex_(BOOST_ASIO_CONCURRENCY_HINT_IS_LOCKING(\n          SCHEDULER, concurrency_hint)),\n    task_(0),\n    get_task_(get_task),\n    task_interrupted_(true),\n    outstanding_work_(0),\n    stopped_(false),\n    shutdown_(false),\n    concurrency_hint_(concurrency_hint),\n    thread_(0)\n...\n
Run Code Online (Sandbox Code Playgroud)\n

不过,我发现的任何内容都与我最初的答案不符,禁用 IO 锁定似乎只是从反应器中删除锁,反应器在调用异步函数或运行完成处理程序时使用。

\n
\n

反馈评论

\n

@tstenner 质疑 io_uring 是否是 Linux 的默认设置。他的问题是对的,原来我没有检查,这取决于linux版本。\n#if LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,45)

\n

他还问

\n
\n

当另一个线程从 io_context::run() 中运行完成处理程序时调用例如 socket.async_write(\xe2\x80\xa6) 有问题吗?

\n
\n

答案是(通过阅读代码)是的。相同的互斥体用于保护“start_op”,它在很多地方使用,但特别是在 basic_socket/acceptor 异步函数中。\n因此,假设“start_op”需要互斥体,那么它被禁用的事实将导致您得出结论这里是潜在的U/B。

\n

在 impl 目录(用于 start_op)中快速“grep”将为您提供我将避免使用的函数列表。您可以将计时器异步函数添加到该列表中,这些函数似乎是在反应器本身中实现的。

\n