如何在RcppParallel中调用用户定义的函数?

Alv*_*vin 6 c++ parallel-processing r rcpp

受艺术http://gallery.rcpp.org/articles/parallel-distance-matrix/的启发,我尝试使用RcppParallel在高维参数空间中运行强力搜索,以便使用多线程进行回测.我陷入了如何在部件中调用自定义函数的问题struct.这个想法是这样的:

首先,首先NumericMatrix params_mat在R中创建一个参数矩阵,然后使用带有List, NumericVector, CharacterVector数据类型的回测数据,例如List Data_1, NumericVector Data_2, CharacterVector Data_3, ...,对于每个参数场景都是静态的params_vec(注意它是行的params_mat).

接下来,定义返回测试函数,该函数输出包含3个关键变量的向量以评估策略性能.

这是我的一个例子params_mat,并Backtesting_Fun可以在R和RCPP,分别运行.

//[[Rcpp::export]]
NumericMatrix data_frame_rcpp(const Rcpp::List& list_params) 
{
  NumericMatrix res = list_params[0];
  return res;
}

# R codes to generate params_mat
params <- expand.grid (x_1=seq(1,100,1), x_2=seq(3,100,2), ..., x_n=seq(4,200,1));                           
list_params = list(ts(params));
tmp_params_data = data_frame_rcpp(list_params);                                              
params_mat = matrix(tmp_params_data, ncol = ncol(tmp_params_data), dimnames = NULL); 
params_vec = params_mat[ii,];

# User-defined Rcpp codes for backtesting
NumericVector Backtesting_Fun (List Data_1, NumericVector Data_2, CharacterVector Data_3, ..., NumericVector params_vec)
{
  // Main function parts to run backtesting for each params_vec scenario.
  ... etc

  // save 3 key result variables together with each params_vec (just a simple illustration).
  NumericVector res = NumericVector::create(params_vec[0],...,params_vec[size-1],
                                            key_1, key_2, key_3); 
  return res;
}
Run Code Online (Sandbox Code Playgroud)

当然,我们需要重写/修改原始RCPP Backtesting_Fun与RVector/RMatrix类型,然后使用下面的RcppParallel代码来调用Backtesting_Funstruct Backtest_parallel:

// [[Rcpp::depends(RcppParallel)]]
#include <RcppParallel.h>
using namespace RcppParallel;

RVector<double> Backtesting_Fun (const RVector<double> Data_1, const RVector<double> Data_2, 
                                const RVector<string> Data_3,..., const RVector<double> params_vec)
{
   // Main function parts to run backtesting for each params_vec scenario.
   ... etc;

   // save 3 key result variables together with each params_vec
   ... etc; 

   return res;
}

struct Backtest_parallel : public Worker 
{       
   // input matrix to read from
   const RVector<List> Data_1;
   const RVector<double> Data_2;
   const RVector<string> Data_3;
   ...
   const RMatrix<double> params_mat;

   // output matrix to write to
   RMatrix<double> rmat;

   // initialize from Rcpp input and output matrixes (the RMatrix class
   // can be automatically converted to from the Rcpp matrix type)
   Backtest_parallel(const List Data_1, const NumericVector Data_2, 
   const CharacterVector Data_3, ..., const NumericMatrix params_mat)
      : Data_1(Data_1), Data_2(Data_2), Data_3(Data_3), ..., params_mat(params_mat) {}

   // function call operator that work for the specified range (begin/end)
   void operator()(std::size_t begin, std::size_t end) 
   {
      for (std::size_t ii = begin; ii < end; i++) 
      {
         // params rows that we will operate on
         RMatrix<double>::Row params_row = params_mat.row(ii);

         // Run the backtesting function defined above
         RVector<double> res = Backtesting_Fun(Data_1, Data_2, ..., params_row)
         for (std::size_t jj = 0; jj < res.length(); jj++) 
         {
            // write to output matrix
            rmat(ii,jj) = res[jj];
         }
      }
   }
};

// [[Rcpp::export]]
NumericMatrix rcpp_parallel_backtest(List Data_1, NumericVector Data_2, CharacterVector Data_3,
                                     ..., NumericMatrix params_mat) 
{      
   // allocate the matrix we will return
   NumericMatrix rmat(params_mat.nrow(), params_mat.nrow()+3);

   // create the worker
   Backtest_parallel backtest_parallel(Data_1, Date_2, ..., params_mat);

   // call it with parallelFor
   parallelFor(0, rmat.nrow(), backtest_parallel);

   return rmat;
}
Run Code Online (Sandbox Code Playgroud)

这是我的问题:

  1. 可以RVector包含List数据类型,还是RcppParallel要包含任何特定容器List;

  2. Backtesting_Fun,输入应该是RVector/RMatrix类型,这是否意味着我们真的需要将原始Rcpp主代码NumericVector转换为RVector

或者,有没有更好的方法在RcppParallel中为我的案例进行并行计算?提前致谢.

编辑:

  1. 我看其他例子关于RcppPararrel http://gallery.rcpp.org/articles/parallel-matrix-transform/,http://gallery.rcpp.org/articles/parallel-inner-product/,共同理念in struct operator()是使用指针来操作数据输入operator(),所以有没有办法在我的情况下使用指针输入构建用户定义的函数?

  2. 如果上面的方法不起作用,它是可行的使用wrap转换RVector/RMatrix回RCPP数据类型,即NumericVector..operator()使输入类型的用户定义函数的Backtesting_Fun可保持不变.

Alv*_*vin 5

我想我可以找到一个替代办法来解决这个问题:关键是要使用一个线程安全的存取含有内变量struct,并保持RVector/ RMatrix在外面的主要功能,以便parallelFor能够很好地工作,这是最重要的部分这个平行的算法.这是我的方式:

  1. 摆脱List数据类型:相反,我们可以List使用NumericVector/ NumericMatrixcontainer 转换变量并记录其相应的索引,以便子向量/子矩阵将指向与列表元素相同的元素.

  2. 转换RVector/ RMatrixarma::vec/arma::mat:作为中提到RcppParallel Github上,C++ Armadillo是线程安全的结构的操作.在这里,我使用这个想法修改了使用RcppParallel并行距离矩阵计算中给出的示例,该概念几乎保持相同的测试速度.

    struct JsDistance : public Worker
    {
      const RMatrix<double> tmp_MAT;  // input matrix to read from
      RMatrix<double> tmp_rmat;       // output matrix to write to
      std::size_t row_size, col_size;
    
      // Convert global input/output into RMatrix/RVector type
      JsDistance(const NumericMatrix& matrix_input, NumericMatrix& matrix_output, 
                 std::size_t row_size, std::size_t col_size)
        : tmp_MAT(matrix_input), tmp_rmat(matrix_output), row_size(row_size), col_size(col_size) {}
    
      // convert RVector/RMatrix into arma type for Rcpp function
      // and the follwing arma data will be shared in parallel computing
      arma::mat convert()
      {
        RMatrix<double> tmp_mat = tmp_MAT;
        arma::mat MAT(tmp_mat.begin(), row_size, col_size, false);
        return MAT;
      }
    
    
      void operator()(std::size_t begin, std::size_t end)
      {
        for (std::size_t i = begin; i < end; i++)
        {
          for (std::size_t j = 0; j < i; j++)
          {
            // rows we will operate on
            arma::mat MAT = convert();
            arma::rowvec row1 = MAT.row(i);          // get the row of arma matrix
            arma::rowvec row2 = MAT.row(j);
    
            // compute the average using std::tranform from the STL
            std::vector<double> avg(row1.n_elem);
            std::transform(row1.begin(), row1.end(), // input range 1
                           row2.begin(),             // input range 2
                           avg.begin(),              // output range
                           average);                 // function to apply
    
            // calculate divergences
            double d1 = kl_divergence(row1.begin(), row1.end(), avg.begin());
            double d2 = kl_divergence(row2.begin(), row2.end(), avg.begin());
    
            // write to output matrix
            tmp_rmat(i,j) = sqrt(.5 * (d1 + d2));
          }
        }
      }
    };
    
    // [[Rcpp::export]]
    NumericMatrix rcpp_parallel_js_distance_modify(const Rcpp::NumericMatrix& matrix_input, int N_cores)
    {
      // allocate the matrix we will return
      NumericMatrix matrix_output(matrix_input.nrow(), matrix_input.nrow());
      std::size_t row_size = matrix_input.nrow();
      std::size_t col_size = matrix_input.ncol();
    
      // create the worker
      JsDistance jsDistance(matrix_input, matrix_output, row_size, col_size);
    
      // call it with parallelFor
      parallelFor(0, matrix_input.nrow(), jsDistance, matrix_input.nrow()/N_cores);           // parallelFor with grain size setting
    
      return matrix_output;
     }
    
     // Example compare: 
     n_row = 1E3;
     n_col = 1E2;
     m = matrix(runif(n_row*n_col), nrow = n_row, ncol = n_col);
     m = m/rowSums(m);
    
     res <- benchmark(rcpp_parallel_js_distance(m, 6),
             rcpp_parallel_js_distance_orignal(m, 6),
             order="relative")
     res[,1:4];
    
     #test                                    #elapsed   #relative
     rcpp_parallel_js_distance_orignal(m, 6)  128.069    1.000
     rcpp_parallel_js_distance(m, 6)          129.210    1.009
    
    Run Code Online (Sandbox Code Playgroud)

我们可以看到,其中的数据类型operator将是C++ arma,现在我们可以通过直接使用对象而不是指针来安全快速地调用我们的用户定义函数,这可能不是通用的或易于设计的.

现在,这个parallelFor结构将共享相同的数据源,而无需在并行计算中使用额外的副本,然后我们可以通过使用上述问题中提到的想法对回测进行一些略微更改.