提升asio async_resolve对象生存期

Gus*_*avo 2 c++ boost boost-asio c++11

下面的代码旨在执行以下操作:我有一个包含boost asio的解析器对象.解析器对象包含io服务和worker,因此io服务运行函数永远不会返回.只要解析器对象处于活动状态,就可以进行异步请求.当解析器对象超出范围并且队列中仍有请求时,我想完成所有并且解析器对象被销毁.

在这种情况下,根本没有调用处理程序,我不知道为什么.我认为共享指针和一些依赖循环可能存在问题.运行valgrind报告"可能会丢失内存".

任何想法如何使这个工作,所以解析器对象保持活着,直到所有工作完成?

#include <boost/asio.hpp>
#include <memory>
#include <thread>
#include <functional>
#include <string>
#include <iostream>

struct Resolver : public std::enable_shared_from_this<Resolver> {
    boost::asio::io_service                        io_service;
    std::unique_ptr<boost::asio::io_service::work> work;
    std::unique_ptr<std::thread>                   iothread;

    struct Query : public std::enable_shared_from_this<Query>{
        std::shared_ptr<Resolver>                                       service;
        boost::asio::ip::tcp::resolver                                  resolver;
        boost::asio::ip::tcp::resolver::query                           query;
        std::function<void(boost::asio::ip::tcp::resolver::iterator &)> handler;

        Query(std::shared_ptr<Resolver> res, std::function<void(boost::asio::ip::tcp::resolver::iterator &)> handler, const std::string &name) : resolver(res->io_service), query(name, ""), handler(handler) {
            service = res;

        }

        void start() {
                auto self = shared_from_this();
                resolver.async_resolve(query, [self](const boost::system::error_code& ec, boost::asio::ip::tcp::resolver::iterator iterator){
                    self->handler(iterator);
                });     
        }
    };

    Resolver() {
        work.reset(new boost::asio::io_service::work(io_service));
        iothread.reset(new std::thread(std::bind(&Resolver::io, this)));
    }

    ~Resolver() {
        std::cout << "Resolver destroyed" << std::endl;
        work.reset();
        iothread->join();
    }

    void io() {
        io_service.run();
    }

    void asyncResolve(const std::string &name, std::function<void(boost::asio::ip::tcp::resolver::iterator &)> fn) {
        auto query = std::make_shared<Query>(shared_from_this(), fn, name);
        query->start();
    }
};

void test(boost::asio::ip::tcp::resolver::iterator it) {
    std::cout << "Test" << std::endl;
    std::cout << it->endpoint().address().to_string() << std::endl;
}

int main(int argc, const char **argv) {
    auto res = std::make_shared<Resolver>();
    res->asyncResolve("stackoverflow.com", &test);
    res->asyncResolve("stackoverflow.com", &test);
    res->asyncResolve("stackoverflow.com", &test);
    res->asyncResolve("stackoverflow.com", &test);
    res->asyncResolve("stackoverflow.com", &test);
}
Run Code Online (Sandbox Code Playgroud)

seh*_*ehe 5

只运行service(io_service::run())已经确保所有异步操作都已完成(参见文档).

你已经在工作线程上做了这个,并且你加入了那个帖子,所以你应该没事!

唯一的例外是如果一个处理程序抛出,所以要非常精确,你应该处理以下异常run():应该抓住boost :: asio :: io_service :: run()引发的异常吗?

void io() { 
    // http://www.boost.org/doc/libs/1_61_0/doc/html/boost_asio/reference/io_service.html#boost_asio.reference.io_service.effect_of_exceptions_thrown_from_handlers
    for (;;) {
        try {
            io_service.run();
            break; // exited normally
        } catch (std::exception const &e) {
            std::cerr << "[Resolver] An unexpected error occurred: " << e.what();
        } catch (...) {
            std::cerr << "[Resolver] An unexpected error occurred";
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

那么......问题出在哪里?

这个问题非常挑剔,隐藏在线程和shared_ptr之间.

共享指针导致~Resolver在工作线程上运行.这意味着您不能 join()使用工作线程(因为线程永远不能自己加入).一个好的实现会抛出异常,导致进程终止.

还有更多:如果你只是main()在工作线程处理异步任务时退出,那么完成处理程序可能会全局数据std::cout被拆除之后运行.所以要实际**看*Resolver完成工作并破坏,你需要确保main不会太快退出.

简化:

现在,以下是一个简化的示例,它确实显示异步操作完成:(仍然存在问题):

#define BOOST_ASIO_ENABLE_HANDLER_TRACKING 1
#include <boost/asio.hpp>
#include <boost/optional.hpp>
#include <thread>
#include <iostream>

class Resolver : public std::enable_shared_from_this<Resolver> {
    using tcp = boost::asio::ip::tcp;
    using io_service = boost::asio::io_service;

    io_service _svc;
    tcp::resolver resolver { _svc };

    boost::optional<io_service::work> work { _svc };
    std::thread _worker { [this] { event_loop(); } };

    void event_loop() { 
        // http://www.boost.org/doc/libs/1_61_0/doc/html/boost_asio/reference/io_service.html#boost_asio.reference.io_service.effect_of_exceptions_thrown_from_handlers
        for (;;) {
            std::cout << __PRETTY_FUNCTION__ << "\n";
            try {
                _svc.run();
                break; // exited normally
            } catch (std::exception const &e) {
                std::cerr << "[Resolver] An unexpected error occurred: " << e.what() << "\n";
            } catch (...) {
                std::cerr << "[Resolver] An unexpected error occurred\n";
            }
        }
        std::cout << "EXIT " << __PRETTY_FUNCTION__ << "\n";
    }

  public:
    ~Resolver() {
        std::cout << __PRETTY_FUNCTION__ << "\n";
        work.reset();
    }

    using Endpoint = tcp::endpoint;
    using Callback = std::function<void(Endpoint)>;

    void asyncResolve(std::string const& name, Callback fn) {
        auto self = shared_from_this();
        resolver.async_resolve({name, ""}, [self,fn](boost::system::error_code ec, tcp::resolver::iterator it) {
                if (!ec) fn(it->endpoint());
            });
    }
};

void test_handler(Resolver::Endpoint ep) {
    std::cout << "Test: " <<  ep << "\n";
}

int main() {
    {
        auto res = std::make_shared<Resolver>();
        for (auto fqdn : {"stackoverflow.com", "google.com", "localhost"})
            res->asyncResolve(fqdn, test_handler);
    }
    std::cout << "Released shared resolver\n";

    std::this_thread::sleep_for(std::chrono::seconds(1));
    std::cout << "Main exit\n";
}
Run Code Online (Sandbox Code Playgroud)

打印:

void Resolver::event_loop()
Released shared resolver
Test: 151.101.65.69:0
Test: 172.217.17.46:0
Test: 127.0.0.1:0
Resolver::~Resolver()
terminate called without an active exception
Run Code Online (Sandbox Code Playgroud)

处理程序跟踪: 在此输入图像描述

剩下的问题

最后一个问题是现在我们没有加入线程.这是从std::thread::~thread析构函数中抛出的.这是一个棘手的问题:

  • 我们不能,join()因为我们可能那个工作线程上
  • 我们不能detach()因为那将创建一个数据竞争,其中工作线程仍然在析构函数完成后运行.

选项是:

  1. _svc::run()从析构函数调用,而不是调用join()线程.这可行,但如果服务用于更多异步任务,则可能不合适,因为副作用排队操作可能会在导致析构函数运行的线程上运行.

  2. join()如果我们不是工作线程,run()如果我们是,那就调用.这总是安全的,因为run()可以称为嵌套,并且操作仍然按预期从工作线程运行

  3. 只是用error_condition join捕获system_error异常resource_deadlock_would_occur

我会说第二个是最干净的.但是在你的简单示例中,第一个选项没有问题,因为(a)如果存在现有的解析操作,则析构函数将始终从工作线程运行(b)如果没有,则服务队列必须为空,所以run()没有任何效果.

所以这是一个修复:

~Resolver() {
    std::cout << __PRETTY_FUNCTION__ << "\n";
    work.reset();

    event_loop();
    if (_worker.joinable()) {
       if (_worker.get_id() == std::this_thread::get_id())
           _worker.detach();
       else
           _worker.join();
    }
}
Run Code Online (Sandbox Code Playgroud)

现在输出是

void Resolver::event_loop()
Released shared resolver
Test: 151.101.193.69:0
Test: 216.58.212.238:0
Test: 127.0.0.1:0
Resolver::~Resolver()
void Resolver::event_loop()
Main exit
Run Code Online (Sandbox Code Playgroud)