Boost asio thread_pool join 不等待任务完成

Mat*_*ias 5 boost-asio threadpool c++11

考虑功能

#include <iostream>
#include <boost/bind.hpp>
#include <boost/asio.hpp>

void foo(const uint64_t begin, uint64_t *result)
{
    uint64_t prev[] = {begin, 0};
    for (uint64_t i = 0; i < 1000000000; ++i)
    {
        const auto tmp = (prev[0] + prev[1]) % 1000;
        prev[1] = prev[0];
        prev[0] = tmp;
    }
    *result = prev[0];
}

void batch(boost::asio::thread_pool &pool, const uint64_t a[])
{
    uint64_t r[] = {0, 0};
    boost::asio::post(pool, boost::bind(foo, a[0], &r[0]));
    boost::asio::post(pool, boost::bind(foo, a[1], &r[1]));

    pool.join();
    std::cerr << "foo(" << a[0] << "): " << r[0] << " foo(" << a[1] << "): " << r[1] << std::endl;
}
Run Code Online (Sandbox Code Playgroud)

wherefoo是一个简单的“纯”函数,它执行计算begin并将结果写入指针*result。使用来自 的不同输入调用此函数batch。这里将每个调用分派到另一个 CPU 内核可能是有益的。

现在假设批处理函数被调用了 10 000 次。因此,在所有顺序批处理调用之间共享的线程池会很好。

尝试这个(为了简单起见,只有 3 个调用)

int main(int argn, char **)
{
    boost::asio::thread_pool pool(2);

    const uint64_t a[] = {2, 4};
    batch(pool, a);

    const uint64_t b[] = {3, 5};
    batch(pool, b);

    const uint64_t c[] = {7, 9};
    batch(pool, c);
}
Run Code Online (Sandbox Code Playgroud)

导致结果

foo(2): 2 foo(4): 4
foo(3): 0 foo(5): 0
foo(7): 0 foo(9): 0

所有三行同时出现,而计算foo需要~3s。我假设只有第一个join真正等待池完成所有作业。其他的结果无效。(未初始化的值)这里重用线程池的最佳实践是什么?

seh*_*ehe 6

最佳实践是不要重用池(如果不断创建新池,池有什么用?)。

如果您想确保将批次“计时”在一起,我建议when_all在 future 上使用:

住在科里鲁

#define BOOST_THREAD_PROVIDES_FUTURE_WHEN_ALL_WHEN_ANY
#include <iostream>
#include <boost/bind.hpp>
#include <boost/asio.hpp>
#include <boost/thread.hpp>

uint64_t foo(uint64_t begin) {
    uint64_t prev[] = {begin, 0};
    for (uint64_t i = 0; i < 1000000000; ++i) {
        const auto tmp = (prev[0] + prev[1]) % 1000;
        prev[1] = prev[0];
        prev[0] = tmp;
    }
    return prev[0];
}

void batch(boost::asio::thread_pool &pool, const uint64_t a[2])
{
    using T = boost::packaged_task<uint64_t>;

    T tasks[] {
        T(boost::bind(foo, a[0])),
        T(boost::bind(foo, a[1])),
    };

    auto all = boost::when_all(
        tasks[0].get_future(),
        tasks[1].get_future());

    for (auto& t : tasks)
        post(pool, std::move(t));

    auto [r0, r1] = all.get();
    std::cerr << "foo(" << a[0] << "): " << r0.get() << " foo(" << a[1] << "): " << r1.get() << std::endl;
}

int main() {
    boost::asio::thread_pool pool(2);

    const uint64_t a[] = {2, 4};
    batch(pool, a);

    const uint64_t b[] = {3, 5};
    batch(pool, b);

    const uint64_t c[] = {7, 9};
    batch(pool, c);
}
Run Code Online (Sandbox Code Playgroud)

印刷

foo(2): 2 foo(4): 4
foo(3): 503 foo(5): 505
foo(7): 507 foo(9): 509
Run Code Online (Sandbox Code Playgroud)

我会考虑

  • 概括化
  • 消息队列

广义的

通过不对批量大小进行硬编码,使其更加灵活。毕竟,池大小已经固定,我们不需要“确保批次适合”之类的:

住在科里鲁

#define BOOST_THREAD_PROVIDES_FUTURE_WHEN_ALL_WHEN_ANY
#include <iostream>
#include <boost/bind.hpp>
#include <boost/asio.hpp>
#include <boost/thread.hpp>
#include <boost/thread/future.hpp>

struct Result { uint64_t begin, result; };

Result foo(uint64_t begin) {
    uint64_t prev[] = {begin, 0};
    for (uint64_t i = 0; i < 1000000000; ++i) {
        const auto tmp = (prev[0] + prev[1]) % 1000;
        prev[1] = prev[0];
        prev[0] = tmp;
    }
    return { begin, prev[0] };
}

void batch(boost::asio::thread_pool &pool, std::vector<uint64_t> const a)
{
    using T = boost::packaged_task<Result>;
    std::vector<T> tasks;
    tasks.reserve(a.size());

    for(auto begin : a)
        tasks.emplace_back(boost::bind(foo, begin));

    std::vector<boost::unique_future<T::result_type> > futures;
    for (auto& t : tasks) {
        futures.push_back(t.get_future());
        post(pool, std::move(t));
    }

    for (auto& fut : boost::when_all(futures.begin(), futures.end()).get()) {
        auto r = fut.get();
        std::cerr << "foo(" << r.begin << "): " << r.result << " ";
    }
    std::cout << std::endl;
}

int main() {
    boost::asio::thread_pool pool(2);

    batch(pool, {2});
    batch(pool, {4, 3, 5});
    batch(pool, {7, 9});
}
Run Code Online (Sandbox Code Playgroud)

印刷

foo(2): 2 
foo(4): 4 foo(3): 503 foo(5): 505 
foo(7): 507 foo(9): 509 
Run Code Online (Sandbox Code Playgroud)

Generalized2:变量简化

与流行的看法相反(老实说,通常会发生这种情况),这次我们可以利用可变参数来消除所有中间向量(其中每一个):

住在科里鲁

void batch(boost::asio::thread_pool &pool, T... a)
{
    auto launch = [&pool](uint64_t begin) {
        boost::packaged_task<Result> pt(boost::bind(foo, begin));
        auto fut = pt.get_future();
        post(pool, std::move(pt));
        return fut;
    };

    for (auto& r : {launch(a).get()...}) {
        std::cerr << "foo(" << r.begin << "): " << r.result << " ";
    }

    std::cout << std::endl;
}
Run Code Online (Sandbox Code Playgroud)

如果您坚持及时输出结果,您仍然可以添加when_all到混合中(需要更多的英雄才能解压元组):

住在科里鲁

template <typename...T>
void batch(boost::asio::thread_pool &pool, T... a)
{
    auto launch = [&pool](uint64_t begin) {
        boost::packaged_task<Result> pt(boost::bind(foo, begin));
        auto fut = pt.get_future();
        post(pool, std::move(pt));
        return fut;
    };

    std::apply([](auto&&... rfut) {
        Result results[] {rfut.get()...};
        for (auto& r : results) {
            std::cerr << "foo(" << r.begin << "): " << r.result << " ";
        }
    }, boost::when_all(launch(a)...).get());

    std::cout << std::endl;
}
Run Code Online (Sandbox Code Playgroud)

两者仍然打印相同的结果

消息队列

这对于提升来说是非常自然的,并且可以跳过大部分复杂性。如果您还想按批处理组进行报告,则必须进行协调:

住在科里鲁

#include <iostream>
#include <boost/asio.hpp>
#include <memory>

struct Result { uint64_t begin, result; };

Result foo(uint64_t begin) {
    uint64_t prev[] = {begin, 0};
    for (uint64_t i = 0; i < 1000000000; ++i) {
        const auto tmp = (prev[0] + prev[1]) % 1000;
        prev[1] = prev[0];
        prev[0] = tmp;
    }
    return { begin, prev[0] };
}

using Group = std::shared_ptr<size_t>;
void batch(boost::asio::thread_pool &pool, std::vector<uint64_t> begins) {
    auto group = std::make_shared<std::vector<Result> >(begins.size());

    for (size_t i=0; i < begins.size(); ++i) {
        post(pool, [i,begin=begins.at(i),group] {
              (*group)[i] = foo(begin);
              if (group.unique()) {
                  for (auto& r : *group) {
                      std::cout << "foo(" << r.begin << "): " << r.result << " ";
                      std::cout << std::endl;
                  }
              }
          });
    }
}

int main() {
    boost::asio::thread_pool pool(2);

    batch(pool, {2});
    batch(pool, {4, 3, 5});
    batch(pool, {7, 9});
    pool.join();
}
Run Code Online (Sandbox Code Playgroud)

请注意,这是对 的并发访问group,由于元素访问的限制,这是安全的。

印刷:

foo(2): 2 
foo(4): 4 foo(3): 503 foo(5): 505 
foo(7): 507 foo(9): 509 
Run Code Online (Sandbox Code Playgroud)


seh*_*ehe 3

我刚刚遇到了这个高级执行器示例,该示例隐藏在文档中:

\n\n
\n

我刚刚意识到 Asio 附带了一个fork_executor示例,它正是这样做的:您可以对任务进行“分组”并加入执行器(代表该组)而不是池。我已经错过这个很长时间了,因为 HTML 文档 \xe2\x80\x93 sehe中没有列出任何执行程序示例 21 分钟前

\n
\n\n

因此,事不宜迟,这是适用于您的问题的示例:

\n\n

住在科里鲁

\n\n
#define BOOST_BIND_NO_PLACEHOLDERS\n#include <boost/asio/thread_pool.hpp>\n#include <boost/asio/ts/executor.hpp>\n#include <condition_variable>\n#include <memory>\n#include <mutex>\n#include <queue>\n#include <thread>\n\n// A fixed-size thread pool used to implement fork/join semantics. Functions\n// are scheduled using a simple FIFO queue. Implementing work stealing, or\n// using a queue based on atomic operations, are left as tasks for the reader.\nclass fork_join_pool : public boost::asio::execution_context {\n  public:\n    // The constructor starts a thread pool with the specified number of\n    // threads. Note that the thread_count is not a fixed limit on the pool\'s\n    // concurrency. Additional threads may temporarily be added to the pool if\n    // they join a fork_executor.\n    explicit fork_join_pool(std::size_t thread_count = std::thread::hardware_concurrency()*2)\n            : use_count_(1), threads_(thread_count)\n    {\n        try {\n            // Ask each thread in the pool to dequeue and execute functions\n            // until it is time to shut down, i.e. the use count is zero.\n            for (thread_count_ = 0; thread_count_ < thread_count; ++thread_count_) {\n                boost::asio::dispatch(threads_, [&] {\n                    std::unique_lock<std::mutex> lock(mutex_);\n                    while (use_count_ > 0)\n                        if (!execute_next(lock))\n                            condition_.wait(lock);\n                });\n            }\n        } catch (...) {\n            stop_threads();\n            threads_.join();\n            throw;\n        }\n    }\n\n    // The destructor waits for the pool to finish executing functions.\n    ~fork_join_pool() {\n        stop_threads();\n        threads_.join();\n    }\n\n  private:\n    friend class fork_executor;\n\n    // The base for all functions that are queued in the pool.\n    struct function_base {\n        std::shared_ptr<std::size_t> work_count_;\n        void (*execute_)(std::shared_ptr<function_base>& p);\n    };\n\n    // Execute the next function from the queue, if any. Returns true if a\n    // function was executed, and false if the queue was empty.\n    bool execute_next(std::unique_lock<std::mutex>& lock) {\n        if (queue_.empty())\n            return false;\n        auto p(queue_.front());\n        queue_.pop();\n        lock.unlock();\n        execute(lock, p);\n        return true;\n    }\n\n    // Execute a function and decrement the outstanding work.\n    void execute(std::unique_lock<std::mutex>& lock,\n                 std::shared_ptr<function_base>& p) {\n        std::shared_ptr<std::size_t> work_count(std::move(p->work_count_));\n        try {\n            p->execute_(p);\n            lock.lock();\n            do_work_finished(work_count);\n        } catch (...) {\n            lock.lock();\n            do_work_finished(work_count);\n            throw;\n        }\n    }\n\n    // Increment outstanding work.\n    void\n    do_work_started(const std::shared_ptr<std::size_t>& work_count) noexcept {\n        if (++(*work_count) == 1)\n            ++use_count_;\n    }\n\n    // Decrement outstanding work. Notify waiting threads if we run out.\n    void\n    do_work_finished(const std::shared_ptr<std::size_t>& work_count) noexcept {\n        if (--(*work_count) == 0) {\n            --use_count_;\n            condition_.notify_all();\n        }\n    }\n\n    // Dispatch a function, executing it immediately if the queue is already\n    // loaded. Otherwise adds the function to the queue and wakes a thread.\n    void do_dispatch(std::shared_ptr<function_base> p,\n                     const std::shared_ptr<std::size_t>& work_count) {\n        std::unique_lock<std::mutex> lock(mutex_);\n        if (queue_.size() > thread_count_ * 16) {\n            do_work_started(work_count);\n            lock.unlock();\n            execute(lock, p);\n        } else {\n            queue_.push(p);\n            do_work_started(work_count);\n            condition_.notify_one();\n        }\n    }\n\n    // Add a function to the queue and wake a thread.\n    void do_post(std::shared_ptr<function_base> p,\n                 const std::shared_ptr<std::size_t>& work_count) {\n        std::lock_guard<std::mutex> lock(mutex_);\n        queue_.push(p);\n        do_work_started(work_count);\n        condition_.notify_one();\n    }\n\n    // Ask all threads to shut down.\n    void stop_threads() {\n        std::lock_guard<std::mutex> lock(mutex_);\n        --use_count_;\n        condition_.notify_all();\n    }\n\n    std::mutex mutex_;\n    std::condition_variable condition_;\n    std::queue<std::shared_ptr<function_base>> queue_;\n    std::size_t use_count_;\n    std::size_t thread_count_;\n    boost::asio::thread_pool threads_;\n};\n\n// A class that satisfies the Executor requirements. Every function or piece of\n// work associated with a fork_executor is part of a single, joinable group.\nclass fork_executor {\n  public:\n    fork_executor(fork_join_pool& ctx)\n            : context_(ctx), work_count_(std::make_shared<std::size_t>(0)) {}\n\n    fork_join_pool& context() const noexcept { return context_; }\n\n    void on_work_started() const noexcept {\n        std::lock_guard<std::mutex> lock(context_.mutex_);\n        context_.do_work_started(work_count_);\n    }\n\n    void on_work_finished() const noexcept {\n        std::lock_guard<std::mutex> lock(context_.mutex_);\n        context_.do_work_finished(work_count_);\n    }\n\n    template <class Func, class Alloc>\n    void dispatch(Func&& f, const Alloc& a) const {\n        auto p(std::allocate_shared<exFun<Func>>(\n            typename std::allocator_traits<Alloc>::template rebind_alloc<char>(a),\n            std::move(f), work_count_));\n        context_.do_dispatch(p, work_count_);\n    }\n\n    template <class Func, class Alloc> void post(Func f, const Alloc& a) const {\n        auto p(std::allocate_shared<exFun<Func>>(\n            typename std::allocator_traits<Alloc>::template rebind_alloc<char>(a),\n            std::move(f), work_count_));\n        context_.do_post(p, work_count_);\n    }\n\n    template <class Func, class Alloc>\n    void defer(Func&& f, const Alloc& a) const {\n        post(std::forward<Func>(f), a);\n    }\n\n    friend bool operator==(const fork_executor& a, const fork_executor& b) noexcept {\n        return a.work_count_ == b.work_count_;\n    }\n\n    friend bool operator!=(const fork_executor& a, const fork_executor& b) noexcept {\n        return a.work_count_ != b.work_count_;\n    }\n\n    // Block until all work associated with the executor is complete. While it\n    // is waiting, the thread may be borrowed to execute functions from the\n    // queue.\n    void join() const {\n        std::unique_lock<std::mutex> lock(context_.mutex_);\n        while (*work_count_ > 0)\n            if (!context_.execute_next(lock))\n                context_.condition_.wait(lock);\n    }\n\n  private:\n    template <class Func> struct exFun : fork_join_pool::function_base {\n        explicit exFun(Func f, const std::shared_ptr<std::size_t>& w)\n                : function_(std::move(f)) {\n            work_count_ = w;\n            execute_ = [](std::shared_ptr<fork_join_pool::function_base>& p) {\n                Func tmp(std::move(static_cast<exFun*>(p.get())->function_));\n                p.reset();\n                tmp();\n            };\n        }\n\n        Func function_;\n    };\n\n    fork_join_pool& context_;\n    std::shared_ptr<std::size_t> work_count_;\n};\n\n// Helper class to automatically join a fork_executor when exiting a scope.\nclass join_guard {\n  public:\n    explicit join_guard(const fork_executor& ex) : ex_(ex) {}\n    join_guard(const join_guard&) = delete;\n    join_guard(join_guard&&) = delete;\n    ~join_guard() { ex_.join(); }\n\n  private:\n    fork_executor ex_;\n};\n\n//------------------------------------------------------------------------------\n\n#include <algorithm>\n#include <iostream>\n#include <random>\n#include <vector>\n#include <boost/bind.hpp>\n\nstatic void foo(const uint64_t begin, uint64_t *result)\n{\n    uint64_t prev[] = {begin, 0};\n    for (uint64_t i = 0; i < 1000000000; ++i) {\n        const auto tmp = (prev[0] + prev[1]) % 1000;\n        prev[1] = prev[0];\n        prev[0] = tmp;\n    }\n    *result = prev[0];\n}\n\nvoid batch(fork_join_pool &pool, const uint64_t (&a)[2])\n{\n    uint64_t r[] = {0, 0};\n    {\n        fork_executor fork(pool);\n        join_guard join(fork);\n        boost::asio::post(fork, boost::bind(foo, a[0], &r[0]));\n        boost::asio::post(fork, boost::bind(foo, a[1], &r[1]));\n        // fork.join(); // or let join_guard destructor run\n    }\n    std::cerr << "foo(" << a[0] << "): " << r[0] << " foo(" << a[1] << "): " << r[1] << std::endl;\n}\n\nint main() {\n    fork_join_pool pool;\n\n    batch(pool, {2, 4});\n    batch(pool, {3, 5});\n    batch(pool, {7, 9});\n}\n
Run Code Online (Sandbox Code Playgroud)\n\n

印刷:

\n\n
foo(2): 2 foo(4): 4\nfoo(3): 503 foo(5): 505\nfoo(7): 507 foo(9): 509\n
Run Code Online (Sandbox Code Playgroud)\n\n

注意事项:

\n\n
    \n
  • 执行器可以重叠/嵌套:您可以在单个 fork_join_pool 上使用多个可连接的 fork_executor,它们将为每个执行器加入不同的任务组
  • \n
\n\n

当查看库示例(它执行递归分而治之合并排序)时,您可以轻松地理解这种感觉。

\n