我可以在不等待未来限制的情况下使用std :: async吗?

Roe*_*rel 40 c++ multithreading asynchronous c++11 stdasync

高级别
我想在异步模式下调用一些没有返回值的函数而不等待它们完成.如果我使用std :: async,则在任务结束之前,未来的对象不会被破坏,这使得调用在我的情况下不同步.

void sendMail(const std::string& address, const std::string& message)
{
    //sending the e-mail which takes some time...
}

myResonseType processRequest(args...)
{
    //Do some processing and valuate the address and the message...

    //Sending the e-mail async
    auto f = std::async(std::launch::async, sendMail, address, message);

    //returning the response ASAP to the client
    return myResponseType;

} //<-- I'm stuck here until the async call finish to allow f to be destructed.
  // gaining no benefit from the async call.
Run Code Online (Sandbox Code Playgroud)

我的问题是

  1. 有没有办法克服这个限制?
  2. 如果(1)不是,我应该实施一个线程,将采取那些"僵尸"期货并等待它们吗?
  3. 是(1)和(2)是否,是否有任何其他选项,然后只是建立我自己的线程池?

注意:
我宁愿不使用thread + detach选项(由@ galop1n建议),因为创建一个新线程有一个我希望避免的开销.使用std :: async(至少在MSVC上)使用内部线程池.

谢谢.

Jon*_*ely 15

您可以将未来移动到全局对象中,因此当本地future的析构函数运行时,它不必等待异步线程完成.

std::vector<std::future<void>> pending_futures;

myResonseType processRequest(args...)
{
    //Do some processing and valuate the address and the message...

    //Sending the e-mail async
    auto f = std::async(std::launch::async, sendMail, address, message);

    // transfer the future's shared state to a longer-lived future
    pending_futures.push_back(std::move(f));

    //returning the response ASAP to the client
    return myResponseType;

}
Run Code Online (Sandbox Code Playgroud)

注意如果异步线程引用processRequest函数中的任何局部变量,则这不安全.

使用std::async(至少在MSVC上)使用内部线程池.

这实际上是不符合的,标准明确表示运行的任务std::launch::async必须像在新线程中一样运行,因此任何线程局部变量都不能从一个任务持久存储到另一个任务.但通常并不重要.

  • 因此,请定期浏览矢量并删除准备好的期货.您可以将其添加到`processRequest`函数中,因此每次调用它时,您都会看到是否有可以从向量中删除的现有期货.这并不复杂. (4认同)
  • 这是一个糟糕的方法,我最终会得到一个不断增长的向量(`pending_futuers`) (3认同)
  • 你的问题是如何避免在'future`析构函数中等待,我回答说.如果你想创建自己的线程池,那就没问题了(虽然我怀疑你的线程池和Windows运行时的线程池一样高效),但这并不会改变你最初提出的问题. (2认同)

gal*_*p1n 10

如果您不愿意加入,为什么不只是启动线程并分离呢?

std::thread{ sendMail, address, message}.detach();   
Run Code Online (Sandbox Code Playgroud)

std :: async绑定到它返回的std :: future的生存期,它们是无可替代的。

将std :: future放入另一个线程读取的等待队列中,将需要与接收新任务的池相同的安全机制,例如容器周围的互斥体。

因此,最好的选择是线程池,以使用直接在线程安全队列中推送的任务。并且它不依赖于特定的实现。

在采用任何可调用和参数的线程池实现下面,线程会在队列上进行轮询,更好的实现应使用条件变量(coliru):

#include <iostream>
#include <queue>
#include <memory>
#include <thread>
#include <mutex>
#include <functional>
#include <string>

struct ThreadPool {
    struct Task {
        virtual void Run() const = 0;
        virtual ~Task() {};
    };   

    template < typename task_, typename... args_ >
    struct RealTask : public Task {
        RealTask( task_&& task, args_&&... args ) : fun_( std::bind( std::forward<task_>(task), std::forward<args_>(args)... ) ) {}
        void Run() const override {
            fun_();
        }
    private:
        decltype( std::bind(std::declval<task_>(), std::declval<args_>()... ) ) fun_;
    };

    template < typename task_, typename... args_ >
    void AddTask( task_&& task, args_&&... args ) {
        auto lock = std::unique_lock<std::mutex>{mtx_};
        using FinalTask = RealTask<task_, args_... >;
        q_.push( std::unique_ptr<Task>( new FinalTask( std::forward<task_>(task), std::forward<args_>(args)... ) ) );
    }

    ThreadPool() {
        for( auto & t : pool_ )
            t = std::thread( [=] {
                while ( true ) {
                    std::unique_ptr<Task> task;
                    {
                        auto lock = std::unique_lock<std::mutex>{mtx_};
                        if ( q_.empty() && stop_ ) 
                            break;
                        if ( q_.empty() )
                            continue;
                        task = std::move(q_.front());
                        q_.pop();
                    }
                    if (task)
                        task->Run();
                }
            } );
    }
    ~ThreadPool() {
        {
            auto lock = std::unique_lock<std::mutex>{mtx_};
            stop_ = true;
        }
        for( auto & t : pool_ )
            t.join();
    }
private:
    std::queue<std::unique_ptr<Task>> q_;
    std::thread pool_[8]; 
    std::mutex mtx_;
    volatile bool stop_ {};
};

void foo( int a, int b ) {
    std::cout << a << "." << b;
}
void bar( std::string const & s) {
    std::cout << s;
}

int main() {
    ThreadPool pool;
    for( int i{}; i!=42; ++i ) {
        pool.AddTask( foo, 3, 14 );    
        pool.AddTask( bar, " - " );    
    }
}
Run Code Online (Sandbox Code Playgroud)

  • 该程序具有数据竞争:ThreadPool和~~ ThreadPool可能同时访问stop_。volatile对多线程没有有用的(可移植的)语义:它必须是std :: atomic或〜ThreadPool需要在保持mtx_的情况下访问它。您的线程也忙于等待,最好在队列为空时阻塞条件变量。 (7认同)
  • 您的“线程池”中的线程忙于旋转,浪费了CPU。了解如何使用条件变量。 (3认同)
  • 使用 MSVC 编译时的 std::async 使用的是内部线程池。每次自己创建一个线程都会产生我希望避免的性能开销。 (2认同)

小智 7

与其将未来移动到全局对象中(并手动管理未使用的期货的删除),您实际上可以将其移动到异步调用函数的本地范围内

“让异步函数拥有自己的未来”,可以这么说。

我想出了这个对我有用的模板包装器(在 Windows 上测试过):

#include <future>

template<class Function, class... Args>
void async_wrapper(Function&& f, Args&&... args, std::future<void>& future,
                   std::future<void>&& is_valid, std::promise<void>&& is_moved) {
    is_valid.wait(); // Wait until the return value of std::async is written to "future"
    auto our_future = std::move(future); // Move "future" to a local variable
    is_moved.set_value(); // Only now we can leave void_async in the main thread

    // This is also used by std::async so that member function pointers work transparently
    auto functor = std::bind(f, std::forward<Args>(args)...);
    functor();
}

template<class Function, class... Args> // This is what you call instead of std::async
void void_async(Function&& f, Args&&... args) {
    std::future<void> future; // This is for std::async return value
    // This is for our synchronization of moving "future" between threads
    std::promise<void> valid;
    std::promise<void> is_moved;
    auto valid_future = valid.get_future();
    auto moved_future = is_moved.get_future();

    // Here we pass "future" as a reference, so that async_wrapper
    // can later work with std::async's return value
    future = std::async(
        async_wrapper<Function, Args...>,
        std::forward<Function>(f), std::forward<Args>(args)...,
        std::ref(future), std::move(valid_future), std::move(is_moved)
    );
    valid.set_value(); // Unblock async_wrapper waiting for "future" to become valid
    moved_future.wait(); // Wait for "future" to actually be moved
}
Run Code Online (Sandbox Code Playgroud)

我有点惊讶它的工作原理,因为我认为移动的未来的析构函数会阻塞,直到我们离开async_wrapper。它应该等待async_wrapper返回,但它正在该函数内部等待。从逻辑上讲,它应该是一个僵局,但事实并非如此。

我还尝试在async_wrapper的末尾添加一行来手动清空未来对象:

our_future = std::future<void>();
Run Code Online (Sandbox Code Playgroud)

这也不会阻塞。