我有一个相当简单的问题,但我无法找到一个优雅的解决方案.
我有一个Thrust代码,它生成c
包含值的相同大小的向量.假设这些c
向量中的每一个都有一个索引.我想为每个向量位置获取c
值为最低的向量的索引:
例:
C0 = (0,10,20,3,40)
C1 = (1,2 ,3 ,5,10)
Run Code Online (Sandbox Code Playgroud)
我会得到一个包含C
具有最低值的向量索引的向量:
result = (0,1 ,1 ,0,1)
Run Code Online (Sandbox Code Playgroud)
我已经考虑过使用推力zip迭代器来做这件事,但是已经遇到了问题:我可以压缩所有c
向量并实现任意转换,它接受一个元组并返回其最低值的索引,但是:
10
元素,并且可以存在比10
c
矢量更多的元素.然后我考虑这样做:不是使用c
单独的向量,而是将它们全部附加到单个向量中C
,然后生成引用位置的键并按键执行稳定排序,这将从同一位置重新组合向量条目.在示例中,将给出:
C = (0,10,20,3,40,1,2,3,5,10)
keys = (0,1 ,2 ,3,4 ,0,1,2,3,4 )
after stable sort by key:
output = (0,1,10,2,20,3,3,5,40,10)
keys = (0,0,1 ,1,2 ,2,3,3,4 ,4 )
Run Code Online (Sandbox Code Playgroud)
然后使用向量中的位置生成键,使用向量的索引压缩输出,c
然后使用自定义函数执行按键缩减,对于每个缩减,输出具有最低值的索引.在示例中:
input = (0,1,10,2,20,3,3,5,40,10)
indexes= (0,1,0 ,1,0 ,1,0,1,0 ,1)
keys = (0,0,1 ,1,2 ,2,3,3,4 ,4)
after reduce by keys on zipped input and indexes:
output = (0,1,1,0,1)
Run Code Online (Sandbox Code Playgroud)
但是,如何通过键操作来编写这样的仿函数呢?
因为向量的长度必须相同。最好将它们连接在一起并将它们视为矩阵 C。
那么你的问题就变成了在行主矩阵中找到每列的最小元素的索引。可以如下解决。
在步骤1中,您建议使用stable_sort_by_key
重新排列元素顺序,这不是一个有效的方法。由于可以根据矩阵的 #row 和 #col 直接计算重排。简而言之,可以使用排列迭代器来完成:
thrust::make_permutation_iterator(
c.begin(),
thrust::make_transform_iterator(
thrust::make_counting_iterator((int) 0),
(_1 % row) * col + _1 / row)
)
Run Code Online (Sandbox Code Playgroud)
在第2步中,reduce_by_key
完全可以做你想做的事。在您的情况下,减少二元操作函子很容易,因为已经定义了对元组(压缩向量的元素)的比较来比较元组的第一个元素,并且它由推力支持
thrust::minimum< thrust::tuple<float, int> >()
Run Code Online (Sandbox Code Playgroud)
整个程序如下所示。因为我在奇特的迭代器中使用占位符,所以需要 Thrust 1.6.0+。
#include <iterator>
#include <algorithm>
#include <thrust/device_vector.h>
#include <thrust/iterator/counting_iterator.h>
#include <thrust/iterator/transform_iterator.h>
#include <thrust/iterator/permutation_iterator.h>
#include <thrust/iterator/zip_iterator.h>
#include <thrust/iterator/discard_iterator.h>
#include <thrust/reduce.h>
#include <thrust/functional.h>
using namespace thrust::placeholders;
int main()
{
const int row = 2;
const int col = 5;
float initc[] =
{ 0, 10, 20, 3, 40, 1, 2, 3, 5, 10 };
thrust::device_vector<float> c(initc, initc + row * col);
thrust::device_vector<float> minval(col);
thrust::device_vector<int> minidx(col);
thrust::reduce_by_key(
thrust::make_transform_iterator(
thrust::make_counting_iterator((int) 0),
_1 / row),
thrust::make_transform_iterator(
thrust::make_counting_iterator((int) 0),
_1 / row) + row * col,
thrust::make_zip_iterator(
thrust::make_tuple(
thrust::make_permutation_iterator(
c.begin(),
thrust::make_transform_iterator(
thrust::make_counting_iterator((int) 0), (_1 % row) * col + _1 / row)),
thrust::make_transform_iterator(
thrust::make_counting_iterator((int) 0), _1 % row))),
thrust::make_discard_iterator(),
thrust::make_zip_iterator(
thrust::make_tuple(
minval.begin(),
minidx.begin())),
thrust::equal_to<int>(),
thrust::minimum<thrust::tuple<float, int> >()
);
std::copy(minidx.begin(), minidx.end(), std::ostream_iterator<int>(std::cout, " "));
std::cout << std::endl;
return 0;
}
Run Code Online (Sandbox Code Playgroud)
剩下的两个问题可能会影响性能。
reduce_by_key
专为不同长度的段而设计,它可能不是减少相同长度段的最快算法。编写自己的内核可能是实现最高性能的最佳解决方案。