Mat*_*ias 5 boost-asio threadpool c++11
考虑功能
#include <iostream>
#include <boost/bind.hpp>
#include <boost/asio.hpp>
void foo(const uint64_t begin, uint64_t *result)
{
uint64_t prev[] = {begin, 0};
for (uint64_t i = 0; i < 1000000000; ++i)
{
const auto tmp = (prev[0] + prev[1]) % 1000;
prev[1] = prev[0];
prev[0] = tmp;
}
*result = prev[0];
}
void batch(boost::asio::thread_pool &pool, const uint64_t a[])
{
uint64_t r[] = {0, 0};
boost::asio::post(pool, boost::bind(foo, a[0], &r[0]));
boost::asio::post(pool, boost::bind(foo, a[1], &r[1]));
pool.join();
std::cerr << "foo(" << a[0] << "): " << r[0] << " foo(" << a[1] << "): " << r[1] << std::endl;
}
Run Code Online (Sandbox Code Playgroud)
wherefoo是一个简单的“纯”函数,它执行计算begin并将结果写入指针*result。使用来自 的不同输入调用此函数batch。这里将每个调用分派到另一个 CPU 内核可能是有益的。
现在假设批处理函数被调用了 10 000 次。因此,在所有顺序批处理调用之间共享的线程池会很好。
尝试这个(为了简单起见,只有 3 个调用)
int main(int argn, char **)
{
boost::asio::thread_pool pool(2);
const uint64_t a[] = {2, 4};
batch(pool, a);
const uint64_t b[] = {3, 5};
batch(pool, b);
const uint64_t c[] = {7, 9};
batch(pool, c);
}
Run Code Online (Sandbox Code Playgroud)
导致结果
foo(2): 2 foo(4): 4
foo(3): 0 foo(5): 0
foo(7): 0 foo(9): 0
所有三行同时出现,而计算foo需要~3s。我假设只有第一个join真正等待池完成所有作业。其他的结果无效。(未初始化的值)这里重用线程池的最佳实践是什么?
最佳实践是不要重用池(如果不断创建新池,池有什么用?)。
如果您想确保将批次“计时”在一起,我建议when_all在 future 上使用:
#define BOOST_THREAD_PROVIDES_FUTURE_WHEN_ALL_WHEN_ANY
#include <iostream>
#include <boost/bind.hpp>
#include <boost/asio.hpp>
#include <boost/thread.hpp>
uint64_t foo(uint64_t begin) {
uint64_t prev[] = {begin, 0};
for (uint64_t i = 0; i < 1000000000; ++i) {
const auto tmp = (prev[0] + prev[1]) % 1000;
prev[1] = prev[0];
prev[0] = tmp;
}
return prev[0];
}
void batch(boost::asio::thread_pool &pool, const uint64_t a[2])
{
using T = boost::packaged_task<uint64_t>;
T tasks[] {
T(boost::bind(foo, a[0])),
T(boost::bind(foo, a[1])),
};
auto all = boost::when_all(
tasks[0].get_future(),
tasks[1].get_future());
for (auto& t : tasks)
post(pool, std::move(t));
auto [r0, r1] = all.get();
std::cerr << "foo(" << a[0] << "): " << r0.get() << " foo(" << a[1] << "): " << r1.get() << std::endl;
}
int main() {
boost::asio::thread_pool pool(2);
const uint64_t a[] = {2, 4};
batch(pool, a);
const uint64_t b[] = {3, 5};
batch(pool, b);
const uint64_t c[] = {7, 9};
batch(pool, c);
}
Run Code Online (Sandbox Code Playgroud)
印刷
foo(2): 2 foo(4): 4
foo(3): 503 foo(5): 505
foo(7): 507 foo(9): 509
Run Code Online (Sandbox Code Playgroud)
我会考虑
通过不对批量大小进行硬编码,使其更加灵活。毕竟,池大小已经固定,我们不需要“确保批次适合”之类的:
#define BOOST_THREAD_PROVIDES_FUTURE_WHEN_ALL_WHEN_ANY
#include <iostream>
#include <boost/bind.hpp>
#include <boost/asio.hpp>
#include <boost/thread.hpp>
#include <boost/thread/future.hpp>
struct Result { uint64_t begin, result; };
Result foo(uint64_t begin) {
uint64_t prev[] = {begin, 0};
for (uint64_t i = 0; i < 1000000000; ++i) {
const auto tmp = (prev[0] + prev[1]) % 1000;
prev[1] = prev[0];
prev[0] = tmp;
}
return { begin, prev[0] };
}
void batch(boost::asio::thread_pool &pool, std::vector<uint64_t> const a)
{
using T = boost::packaged_task<Result>;
std::vector<T> tasks;
tasks.reserve(a.size());
for(auto begin : a)
tasks.emplace_back(boost::bind(foo, begin));
std::vector<boost::unique_future<T::result_type> > futures;
for (auto& t : tasks) {
futures.push_back(t.get_future());
post(pool, std::move(t));
}
for (auto& fut : boost::when_all(futures.begin(), futures.end()).get()) {
auto r = fut.get();
std::cerr << "foo(" << r.begin << "): " << r.result << " ";
}
std::cout << std::endl;
}
int main() {
boost::asio::thread_pool pool(2);
batch(pool, {2});
batch(pool, {4, 3, 5});
batch(pool, {7, 9});
}
Run Code Online (Sandbox Code Playgroud)
印刷
foo(2): 2
foo(4): 4 foo(3): 503 foo(5): 505
foo(7): 507 foo(9): 509
Run Code Online (Sandbox Code Playgroud)
与流行的看法相反(老实说,通常会发生这种情况),这次我们可以利用可变参数来消除所有中间向量(其中每一个):
void batch(boost::asio::thread_pool &pool, T... a)
{
auto launch = [&pool](uint64_t begin) {
boost::packaged_task<Result> pt(boost::bind(foo, begin));
auto fut = pt.get_future();
post(pool, std::move(pt));
return fut;
};
for (auto& r : {launch(a).get()...}) {
std::cerr << "foo(" << r.begin << "): " << r.result << " ";
}
std::cout << std::endl;
}
Run Code Online (Sandbox Code Playgroud)
如果您坚持及时输出结果,您仍然可以添加when_all到混合中(需要更多的英雄才能解压元组):
template <typename...T>
void batch(boost::asio::thread_pool &pool, T... a)
{
auto launch = [&pool](uint64_t begin) {
boost::packaged_task<Result> pt(boost::bind(foo, begin));
auto fut = pt.get_future();
post(pool, std::move(pt));
return fut;
};
std::apply([](auto&&... rfut) {
Result results[] {rfut.get()...};
for (auto& r : results) {
std::cerr << "foo(" << r.begin << "): " << r.result << " ";
}
}, boost::when_all(launch(a)...).get());
std::cout << std::endl;
}
Run Code Online (Sandbox Code Playgroud)
两者仍然打印相同的结果
这对于提升来说是非常自然的,并且可以跳过大部分复杂性。如果您还想按批处理组进行报告,则必须进行协调:
#include <iostream>
#include <boost/asio.hpp>
#include <memory>
struct Result { uint64_t begin, result; };
Result foo(uint64_t begin) {
uint64_t prev[] = {begin, 0};
for (uint64_t i = 0; i < 1000000000; ++i) {
const auto tmp = (prev[0] + prev[1]) % 1000;
prev[1] = prev[0];
prev[0] = tmp;
}
return { begin, prev[0] };
}
using Group = std::shared_ptr<size_t>;
void batch(boost::asio::thread_pool &pool, std::vector<uint64_t> begins) {
auto group = std::make_shared<std::vector<Result> >(begins.size());
for (size_t i=0; i < begins.size(); ++i) {
post(pool, [i,begin=begins.at(i),group] {
(*group)[i] = foo(begin);
if (group.unique()) {
for (auto& r : *group) {
std::cout << "foo(" << r.begin << "): " << r.result << " ";
std::cout << std::endl;
}
}
});
}
}
int main() {
boost::asio::thread_pool pool(2);
batch(pool, {2});
batch(pool, {4, 3, 5});
batch(pool, {7, 9});
pool.join();
}
Run Code Online (Sandbox Code Playgroud)
请注意,这是对 的并发访问
group,由于元素访问的限制,这是安全的。
印刷:
foo(2): 2
foo(4): 4 foo(3): 503 foo(5): 505
foo(7): 507 foo(9): 509
Run Code Online (Sandbox Code Playgroud)
我刚刚遇到了这个高级执行器示例,该示例隐藏在文档中:
\n\n\n\n\n我刚刚意识到 Asio 附带了一个
\nfork_executor示例,它正是这样做的:您可以对任务进行“分组”并加入执行器(代表该组)而不是池。我已经错过这个很长时间了,因为 HTML 文档 \xe2\x80\x93 sehe中没有列出任何执行程序示例 21 分钟前
因此,事不宜迟,这是适用于您的问题的示例:
\n\n\n\n#define BOOST_BIND_NO_PLACEHOLDERS\n#include <boost/asio/thread_pool.hpp>\n#include <boost/asio/ts/executor.hpp>\n#include <condition_variable>\n#include <memory>\n#include <mutex>\n#include <queue>\n#include <thread>\n\n// A fixed-size thread pool used to implement fork/join semantics. Functions\n// are scheduled using a simple FIFO queue. Implementing work stealing, or\n// using a queue based on atomic operations, are left as tasks for the reader.\nclass fork_join_pool : public boost::asio::execution_context {\n public:\n // The constructor starts a thread pool with the specified number of\n // threads. Note that the thread_count is not a fixed limit on the pool\'s\n // concurrency. Additional threads may temporarily be added to the pool if\n // they join a fork_executor.\n explicit fork_join_pool(std::size_t thread_count = std::thread::hardware_concurrency()*2)\n : use_count_(1), threads_(thread_count)\n {\n try {\n // Ask each thread in the pool to dequeue and execute functions\n // until it is time to shut down, i.e. the use count is zero.\n for (thread_count_ = 0; thread_count_ < thread_count; ++thread_count_) {\n boost::asio::dispatch(threads_, [&] {\n std::unique_lock<std::mutex> lock(mutex_);\n while (use_count_ > 0)\n if (!execute_next(lock))\n condition_.wait(lock);\n });\n }\n } catch (...) {\n stop_threads();\n threads_.join();\n throw;\n }\n }\n\n // The destructor waits for the pool to finish executing functions.\n ~fork_join_pool() {\n stop_threads();\n threads_.join();\n }\n\n private:\n friend class fork_executor;\n\n // The base for all functions that are queued in the pool.\n struct function_base {\n std::shared_ptr<std::size_t> work_count_;\n void (*execute_)(std::shared_ptr<function_base>& p);\n };\n\n // Execute the next function from the queue, if any. Returns true if a\n // function was executed, and false if the queue was empty.\n bool execute_next(std::unique_lock<std::mutex>& lock) {\n if (queue_.empty())\n return false;\n auto p(queue_.front());\n queue_.pop();\n lock.unlock();\n execute(lock, p);\n return true;\n }\n\n // Execute a function and decrement the outstanding work.\n void execute(std::unique_lock<std::mutex>& lock,\n std::shared_ptr<function_base>& p) {\n std::shared_ptr<std::size_t> work_count(std::move(p->work_count_));\n try {\n p->execute_(p);\n lock.lock();\n do_work_finished(work_count);\n } catch (...) {\n lock.lock();\n do_work_finished(work_count);\n throw;\n }\n }\n\n // Increment outstanding work.\n void\n do_work_started(const std::shared_ptr<std::size_t>& work_count) noexcept {\n if (++(*work_count) == 1)\n ++use_count_;\n }\n\n // Decrement outstanding work. Notify waiting threads if we run out.\n void\n do_work_finished(const std::shared_ptr<std::size_t>& work_count) noexcept {\n if (--(*work_count) == 0) {\n --use_count_;\n condition_.notify_all();\n }\n }\n\n // Dispatch a function, executing it immediately if the queue is already\n // loaded. Otherwise adds the function to the queue and wakes a thread.\n void do_dispatch(std::shared_ptr<function_base> p,\n const std::shared_ptr<std::size_t>& work_count) {\n std::unique_lock<std::mutex> lock(mutex_);\n if (queue_.size() > thread_count_ * 16) {\n do_work_started(work_count);\n lock.unlock();\n execute(lock, p);\n } else {\n queue_.push(p);\n do_work_started(work_count);\n condition_.notify_one();\n }\n }\n\n // Add a function to the queue and wake a thread.\n void do_post(std::shared_ptr<function_base> p,\n const std::shared_ptr<std::size_t>& work_count) {\n std::lock_guard<std::mutex> lock(mutex_);\n queue_.push(p);\n do_work_started(work_count);\n condition_.notify_one();\n }\n\n // Ask all threads to shut down.\n void stop_threads() {\n std::lock_guard<std::mutex> lock(mutex_);\n --use_count_;\n condition_.notify_all();\n }\n\n std::mutex mutex_;\n std::condition_variable condition_;\n std::queue<std::shared_ptr<function_base>> queue_;\n std::size_t use_count_;\n std::size_t thread_count_;\n boost::asio::thread_pool threads_;\n};\n\n// A class that satisfies the Executor requirements. Every function or piece of\n// work associated with a fork_executor is part of a single, joinable group.\nclass fork_executor {\n public:\n fork_executor(fork_join_pool& ctx)\n : context_(ctx), work_count_(std::make_shared<std::size_t>(0)) {}\n\n fork_join_pool& context() const noexcept { return context_; }\n\n void on_work_started() const noexcept {\n std::lock_guard<std::mutex> lock(context_.mutex_);\n context_.do_work_started(work_count_);\n }\n\n void on_work_finished() const noexcept {\n std::lock_guard<std::mutex> lock(context_.mutex_);\n context_.do_work_finished(work_count_);\n }\n\n template <class Func, class Alloc>\n void dispatch(Func&& f, const Alloc& a) const {\n auto p(std::allocate_shared<exFun<Func>>(\n typename std::allocator_traits<Alloc>::template rebind_alloc<char>(a),\n std::move(f), work_count_));\n context_.do_dispatch(p, work_count_);\n }\n\n template <class Func, class Alloc> void post(Func f, const Alloc& a) const {\n auto p(std::allocate_shared<exFun<Func>>(\n typename std::allocator_traits<Alloc>::template rebind_alloc<char>(a),\n std::move(f), work_count_));\n context_.do_post(p, work_count_);\n }\n\n template <class Func, class Alloc>\n void defer(Func&& f, const Alloc& a) const {\n post(std::forward<Func>(f), a);\n }\n\n friend bool operator==(const fork_executor& a, const fork_executor& b) noexcept {\n return a.work_count_ == b.work_count_;\n }\n\n friend bool operator!=(const fork_executor& a, const fork_executor& b) noexcept {\n return a.work_count_ != b.work_count_;\n }\n\n // Block until all work associated with the executor is complete. While it\n // is waiting, the thread may be borrowed to execute functions from the\n // queue.\n void join() const {\n std::unique_lock<std::mutex> lock(context_.mutex_);\n while (*work_count_ > 0)\n if (!context_.execute_next(lock))\n context_.condition_.wait(lock);\n }\n\n private:\n template <class Func> struct exFun : fork_join_pool::function_base {\n explicit exFun(Func f, const std::shared_ptr<std::size_t>& w)\n : function_(std::move(f)) {\n work_count_ = w;\n execute_ = [](std::shared_ptr<fork_join_pool::function_base>& p) {\n Func tmp(std::move(static_cast<exFun*>(p.get())->function_));\n p.reset();\n tmp();\n };\n }\n\n Func function_;\n };\n\n fork_join_pool& context_;\n std::shared_ptr<std::size_t> work_count_;\n};\n\n// Helper class to automatically join a fork_executor when exiting a scope.\nclass join_guard {\n public:\n explicit join_guard(const fork_executor& ex) : ex_(ex) {}\n join_guard(const join_guard&) = delete;\n join_guard(join_guard&&) = delete;\n ~join_guard() { ex_.join(); }\n\n private:\n fork_executor ex_;\n};\n\n//------------------------------------------------------------------------------\n\n#include <algorithm>\n#include <iostream>\n#include <random>\n#include <vector>\n#include <boost/bind.hpp>\n\nstatic void foo(const uint64_t begin, uint64_t *result)\n{\n uint64_t prev[] = {begin, 0};\n for (uint64_t i = 0; i < 1000000000; ++i) {\n const auto tmp = (prev[0] + prev[1]) % 1000;\n prev[1] = prev[0];\n prev[0] = tmp;\n }\n *result = prev[0];\n}\n\nvoid batch(fork_join_pool &pool, const uint64_t (&a)[2])\n{\n uint64_t r[] = {0, 0};\n {\n fork_executor fork(pool);\n join_guard join(fork);\n boost::asio::post(fork, boost::bind(foo, a[0], &r[0]));\n boost::asio::post(fork, boost::bind(foo, a[1], &r[1]));\n // fork.join(); // or let join_guard destructor run\n }\n std::cerr << "foo(" << a[0] << "): " << r[0] << " foo(" << a[1] << "): " << r[1] << std::endl;\n}\n\nint main() {\n fork_join_pool pool;\n\n batch(pool, {2, 4});\n batch(pool, {3, 5});\n batch(pool, {7, 9});\n}\nRun Code Online (Sandbox Code Playgroud)\n\n印刷:
\n\nfoo(2): 2 foo(4): 4\nfoo(3): 503 foo(5): 505\nfoo(7): 507 foo(9): 509\nRun Code Online (Sandbox Code Playgroud)\n\n注意事项:
\n\n当查看库示例(它执行递归分而治之合并排序)时,您可以轻松地理解这种感觉。
\n| 归档时间: |
|
| 查看次数: |
1039 次 |
| 最近记录: |