安全取消提升asio截止时间计时器

hud*_*dac 8 c++ multithreading boost timer boost-asio

我想boost::asio::basic_waitable_timer<std::chrono::steady_clock>安全取消.

根据这个答案,这段代码应该做的工作:

timer.get_io_service().post([&]{timer.cancel();})
Run Code Online (Sandbox Code Playgroud)

我担心这对我不起作用.
难道我做错了什么?
这是我的代码:

#include <iostream>
#include "boost/asio.hpp"
#include <chrono>
#include <thread>
#include <random>

boost::asio::io_service io_service;
boost::asio::basic_waitable_timer<std::chrono::steady_clock> timer(io_service);
std::atomic<bool> started;

void handle_timeout(const boost::system::error_code& ec)
{
    if (!ec) {
        started = true;
        std::cerr << "tid: " << std::this_thread::get_id() << ", handle_timeout\n";
        timer.expires_from_now(std::chrono::milliseconds(10));
        timer.async_wait(&handle_timeout);
    } else if (ec == boost::asio::error::operation_aborted) {
        std::cerr << "tid: " << std::this_thread::get_id() << ", handle_timeout aborted\n";
    } else {
        std::cerr << "tid: " << std::this_thread::get_id() << ", handle_timeout another error\n";
    }
}

int main() {

    std::cout << "tid: " << std::this_thread::get_id() << ", Hello, World!" << std::endl;
    std::random_device rd;
    std::mt19937 gen(rd());
    std::uniform_int_distribution<> dis(1, 100);

    for (auto i = 0; i < 1000; i++) {

        started = false;
        std::thread t([&](){

            timer.expires_from_now(std::chrono::milliseconds(0));
            timer.async_wait(&handle_timeout);

            io_service.run();
        });

        while (!started) {};
        auto sleep = dis(gen);
        std::cout << "tid: " << std::this_thread::get_id() << ", i: " << i << ", sleeps for " << sleep << " [ms]" << std::endl;
        std::this_thread::sleep_for(std::chrono::milliseconds(sleep));
        timer.get_io_service().post([](){
            std::cerr << "tid: " << std::this_thread::get_id() << ", cancelling in post\n";
            timer.cancel();
        });
//      timer.cancel();
        std::cout << "tid: " << std::this_thread::get_id() << ", i: " << i << ", waiting for thread to join()" << std::endl;
        t.join();
        io_service.reset();
    }

    return 0;
}
Run Code Online (Sandbox Code Playgroud)

这是输出:

... ...
tid:140737335076608,handle_timeout
tid:140737335076608,handle_timeout
tid:140737353967488,i:2,等待线程加入()
tid:140737335076608,在帖子中取消
tid:140737335076608,handle_timeout aborted
tid:140737353967488,i:3,sleeps for 21 [ms]
tid:140737335076608,handle_timeout
tid:140737353967488,i:3,等待线程加入()
tid:140737335076608,handle_timeout
tid:140737335076608,在post
tid中取消:140737335076608,handle_timeout
tid:140737335076608,handle_timeout
tid:140737335076608 ,handle_timeout
tid:140737335076608,handle_timeout
tid:140737335076608,handle_timeout
...
永远继续......

如您所见,timer.cancel()正在从适当的线程调用:

tid:140737335076608,在帖子中取消

但是没有

tid:140737335076608,handle_timeout中止

然后.

主要等待永远.

seh*_*ehe 13

取消安全的.

它只是不健壮.当计时器未挂起时,您没有考虑这种情况.然后,取消它一次,但是一旦调用完成处理程序,它就会启动一个新的异步等待.

以下是我如何跟踪问题的详细步骤.

发明内容 TL; DR

取消时间仅取消飞行中的异步操作.

如果要关闭异步调用链,则必须使用其他逻辑.下面给出一个例子.

处理程序跟踪

启用

#define BOOST_ASIO_ENABLE_HANDLER_TRACKING 1
Run Code Online (Sandbox Code Playgroud)

这会生成可以通过以下方式显示的输出boost/libs/asio/tools/handlerviz.pl:

成功的痕迹

在此输入图像描述

正如您所看到的,async_wait当取消发生时,它正在飞行中.

一个"坏"的痕迹

(截断,因为它会无限运行)

在此输入图像描述

注意完成处理程序如何看到cc=system:0,而不是cc=system:125(for operation_aborted).这是发布的取消实际上没有"采取"这一事实的症状.唯一的逻辑解释(图中不可见)是在调用cancel之前计时器已经过期.

让我们比较原始痕迹¹

在此输入图像描述

¹消除嘈杂的差异

检测它

所以,我们有一个领先优势.我们能检测出来吗?

    timer.get_io_service().post([](){
        std::cerr << "tid: " << std::this_thread::get_id() << ", cancelling in post\n";
        if (timer.expires_from_now() >= std::chrono::steady_clock::duration(0)) {
            timer.cancel();
        } else {
            std::cout << "PANIC\n";
            timer.cancel();
        }
    });
Run Code Online (Sandbox Code Playgroud)

打印:

tid: 140113177143232, i: 0, waiting for thread to join()
tid: 140113177143232, i: 1, waiting for thread to join()
tid: 140113177143232, i: 2, waiting for thread to join()
tid: 140113177143232, i: 3, waiting for thread to join()
tid: 140113177143232, i: 4, waiting for thread to join()
tid: 140113177143232, i: 5, waiting for thread to join()
tid: 140113177143232, i: 6, waiting for thread to join()
tid: 140113177143232, i: 7, waiting for thread to join()
tid: 140113177143232, i: 8, waiting for thread to join()
tid: 140113177143232, i: 9, waiting for thread to join()
tid: 140113177143232, i: 10, waiting for thread to join()
tid: 140113177143232, i: 11, waiting for thread to join()
tid: 140113177143232, i: 12, waiting for thread to join()
tid: 140113177143232, i: 13, waiting for thread to join()
tid: 140113177143232, i: 14, waiting for thread to join()
tid: 140113177143232, i: 15, waiting for thread to join()
tid: 140113177143232, i: 16, waiting for thread to join()
tid: 140113177143232, i: 17, waiting for thread to join()
tid: 140113177143232, i: 18, waiting for thread to join()
tid: 140113177143232, i: 19, waiting for thread to join()
tid: 140113177143232, i: 20, waiting for thread to join()
tid: 140113177143232, i: 21, waiting for thread to join()
tid: 140113177143232, i: 22, waiting for thread to join()
tid: 140113177143232, i: 23, waiting for thread to join()
tid: 140113177143232, i: 24, waiting for thread to join()
tid: 140113177143232, i: 25, waiting for thread to join()
tid: 140113177143232, i: 26, waiting for thread to join()
PANIC
Run Code Online (Sandbox Code Playgroud)

我们能以另一种更清晰的方式传达"超级取消"吗?当然,我们......只是要使用的timer对象:

信号关闭

timer对象没有很多属性可供使用.在close()套接字上没有或类似,可用于将计时器置于某种无效状态.

但是,有到期时间点,我们可以使用特殊的域值来表示我们的应用程序"无效":

timer.get_io_service().post([](){
    std::cerr << "tid: " << std::this_thread::get_id() << ", cancelling in post\n";
    // also cancels:
    timer.expires_at(Timer::clock_type::time_point::min());
});
Run Code Online (Sandbox Code Playgroud)

这个"特殊值"在完成处理程序中很容易处理:

void handle_timeout(const boost::system::error_code& ec)
{
    if (!ec) {
        started = true;
        if (timer.expires_at() != Timer::time_point::min()) {
            timer.expires_from_now(std::chrono::milliseconds(10));
            timer.async_wait(&handle_timeout);
        } else {
            std::cerr << "handle_timeout: detected shutdown\n";
        }
    } 
    else if (ec != boost::asio::error::operation_aborted) {
        std::cerr << "tid: " << std::this_thread::get_id() << ", handle_timeout error " << ec.message() << "\n";
    }
}
Run Code Online (Sandbox Code Playgroud)

  • 很好的解决方法,但是......您不认为应该有更好的取消功能来隐藏其实现细节中的所有混乱吗?与取消相关的问题一次又一次出现...... (2认同)
  • @hudac 我只是确认您对它的使用是线程安全的,我实际上并没有说什么。您对它的使用是安全的 __because__ 您将其发布到服务 __and__ 服务在单个线程上运行,这意味着您会获得“隐式链”行为(没有两个处理程序同时运行)。 (2认同)
  • @hudac 更具体地说,一旦您在更多线程上运行服务,这不是经验法则!在这种情况下,您需要一个链来同步对服务对象的访问(如“deadline_timer”)。请参阅 http://stackoverflow.com/questions/12794107/why-do-i-need-strand-per-connection-when-using-boostasio/12801042#12801042。我希望根据文档(没有人这么说),这可以说明 `cancel()` 是 **not** 线程安全的。 (2认同)