Vit*_*meo 6 c++ recursion multithreading asynchronous c++14
这个问题通过一个简化的例子更容易解释(因为我的真实情况远非"最小"):给定...
template <typename T>
void post_in_thread_pool(T&& f)
Run Code Online (Sandbox Code Playgroud)
...函数模板,我想创建一个具有树状递归结构的并行异步算法.我将使用std::count_if占位符编写下面结构的示例.我将要使用的策略如下:
如果我检查的范围的长度小于64,我将回退到顺序std::count_if功能.(0)
如果它大于或等于64,我将在线程池中生成一个在左半部分递归的作业,并在当前线程上计算该范围的右半部分.(1)
我将使用原子共享int来"等待"计算两半.(2)
我将使用原子共享int来累积部分结果.(3)
简化代码:
auto async_count_if(auto begin, auto end, auto predicate, auto continuation)
{
// (0) Base case:
if(end - begin < 64)
{
continuation(std::count_if(begin, end, predicate));
return;
}
// (1) Recursive case:
auto counter = make_shared<atomic<int>>(2); // (2)
auto cleanup = [=, accumulator = make_shared<atomic<int>>(0) /*(3)*/]
(int partial_result)
{
*accumulator += partial_result;
if(--*counter == 0)
{
continuation(*accumulator);
}
};
const auto mid = std::next(i_begin, sz / 2);
post_in_thread_pool([=]
{
async_count_if(i_begin, mid, predicate, cleanup);
});
async_count_if(mid, i_end, predicate, cleanup);
}
Run Code Online (Sandbox Code Playgroud)
然后可以按如下方式使用代码:
std::vector<int> v(512);
std::iota(std::begin(v), std::end(v), 0);
async_count_if{}(std::begin(v), std::end(v),
/* predicate */ [](auto x){ return x < 256; },
/* continuation */ [](auto res){ std::cout << res << std::endl; });
Run Code Online (Sandbox Code Playgroud)
上面代码中的问题是auto cleanup.因为auto将推导出cleanuplambda的每个实例化的唯一类型,并且由于按值cleanup捕获cont...由于递归,将在编译时计算无限大的嵌套lambda类型,从而导致以下错误:
致命错误:递归模板实例化超过最大深度1024
从概念上讲,您可以想到正在构建的类型大致如下:
cont // user-provided continuation
cleanup0<cont> // recursive step 0
cleanup1<cleanup0<cont>> // recursive step 1
cleanup2<cleanup1<cleanup0<cont>>> // recursive step 2
// ...
Run Code Online (Sandbox Code Playgroud)
(!):请记住这async_count_if 只是一个例子,以显示我的真实情况的"树状"递归结构.我知道异步count_if可以用一个原子计数器和sz / 64任务来简单地实现.
我想避免错误,最小化任何可能的运行时或内存开销.
一种可能的解决方案是使用std::function<void(int)> cleanup,它允许代码编译和正确运行,但产生次优汇编并引入额外的动态分配.wandbox示例
std::size_t模板参数+特化来人为地限制async_count_if::operator()递归深度 - 不幸的是,这可能会使二进制大小膨胀并且非常不优雅.困扰我的是,当我打电话时,我知道范围的大小async_count_if:它是std::distance(i_begin, i_end).如果我知道范围的大小,我还可以推导出所需计数器和连续的数量:(2^k - 1),k递归树的深度在哪里.
因此,我认为应该有一种方法在第一次调用时预先计算"控制结构",async_count_if并通过引用将其传递给递归调用.这个"控制结构"可以包含足够的空间用于(2^k - 1)原子计数器和(2^k - 1)清理/继续功能.
遗憾的是我无法找到一种干净的方法来实现这一点,并决定在这里发布一个问题,因为在开发异步并行递归算法时似乎这个问题应该是常见的.
在不引入不必要开销的情况下处理此问题的优雅方法是什么?
我一定错过了一些非常明显的东西,但是为什么你需要多个计数器和结构?您可以预先计算迭代总数(如果您知道您的基本情况)并在所有迭代中与累加器一起共享它,la(必须稍微修改您的简化代码):
#include <algorithm>
#include <memory>
#include <vector>
#include <iostream>
#include <numeric>
#include <future>
using namespace std;
template <class T>
auto post_in_thread_pool(T&& work)
{
std::async(std::launch::async, work);
}
template <class It, class Pred, class Cont>
auto async_count_if(It begin, It end, Pred predicate, Cont continuation)
{
// (0) Base case:
if(end - begin <= 64)
{
continuation(std::count_if(begin, end, predicate));
return;
}
const auto sz = std::distance(begin, end);
const auto mid = std::next(begin, sz / 2);
post_in_thread_pool([=]
{
async_count_if(begin, mid, predicate, continuation);
});
async_count_if(mid, end, predicate, continuation);
}
template <class It, class Pred, class Cont>
auto async_count_if_facade(It begin, It end, Pred predicate, Cont continuation)
{
// (1) Recursive case:
const auto sz = std::distance(begin, end);
auto counter = make_shared<atomic<int>>(sz / 64); // (fix this for mod 64 !=0 cases)
auto cleanup = [=, accumulator = make_shared<atomic<int>>(0) /*(3)*/]
(int partial_result)
{
*accumulator += partial_result;
if(--*counter == 0)
{
continuation(*accumulator);
}
};
return async_count_if(begin, end, predicate, cleanup);
}
int main ()
{
std::vector<int> v(1024);
std::iota(std::begin(v), std::end(v), 0);
async_count_if_facade(std::begin(v), std::end(v),
/* predicate */ [](auto x){ return x > 1000; },
/* continuation */ [](const auto& res){ std::cout << res << std::endl; });
}
Run Code Online (Sandbox Code Playgroud)
一些演示
| 归档时间: |
|
| 查看次数: |
327 次 |
| 最近记录: |