Dyl*_*ijk 8 performance r matrix
我有一个数据集,X
包含n
行和d
列。我想将每行的点积转置为自身。在 R 代码中,这将是x %*% t(x)
,这给出了一个d
byd
矩阵。
然后,我使用函数截断矩阵中的值:其中tau
是d
byd
矩阵。
trunc_operator = function(x, tau){
x = ifelse(abs(x) > tau, tau*sign(x), x)
return(x)
}
Run Code Online (Sandbox Code Playgroud)
我对每一行执行此操作,然后将它们按顺序添加在一起。
acf_mat = matrix(0, ncol(data), ncol(data))
for(i in 1:nrow(data)){
acf_mat = acf_mat + mat_prod_pairwise_trunc(data[i,], data[i,], tau = tau)
}
Run Code Online (Sandbox Code Playgroud)
我按顺序执行此操作的原因是,我不必存储许多d
矩阵d
,而一次只需存储一个矩阵,因此使用的内存较少。
该mat_prod_pairwise_trunc()
函数先进行点积,然后应用该trunc_operator
函数。
mat_prod_pairwise_trunc = function(x,y, tau){
trunc_operator((x %*% t(y))/2,tau)
}
Run Code Online (Sandbox Code Playgroud)
然而for循环很慢,我想提高速度。
到目前为止的想法
trunc_operator()
只需用C++编写函数即可执行此操作的有效方法是什么?
可重现的示例 下面是完整示例的代码:
# creating dataset
n = 200
d = 200
data = rt(n = n*d, df = 4.1)
data = matrix(data, nrow = n, ncol = d)
data_pairwise_fun = function(data){
n = nrow(data)
data_pairwise = matrix(nrow = (n*(n-1))/2, ncol = ncol(data))
for(i in 1:(n-1)){
for(j in 1:(n-i)){
data_pairwise[((i-1) * n) - (i*(i-1)/2) + j, ] = data[i,] - data[i + j,]
}
}
return(data_pairwise)
}
data = data_pairwise_fun(data)
# tau
t_tau_mat = function(df, delta, d, n){
off = (df/(df-2))^2
on = ((1/2)*((3*df^2)/(df - 2)*(df - 4))) + (3/2)*off
V_mat = diag(on - off, nrow = d, ncol = d)
V_mat = V_mat + off
V_mat = sqrt(V_mat)
V_mat * (sqrt(floor((n/2))/(2*log(d) + log(1/delta))))
}
tau = t_tau_mat(4.1,1/n,d,n)
# functions:
trunc_operator = function(x, tau){
x = ifelse(abs(x) > tau, tau*sign(x), x)
return(x)
}
mat_prod_pairwise_trunc = function(x,y, tau){
trunc_operator((x %*% t(y))/2,tau)
}
# Main job:
acf_mat = matrix(0, ncol(data), ncol(data))
for(i in 1:nrow(data)){
acf_mat = acf_mat + mat_prod_pairwise_trunc(data[i,], data[i,], tau = tau)
}
Run Code Online (Sandbox Code Playgroud)
这里有一些可以提高约 5 倍的技巧。
截断 运算符
这里最慢的部分是ifelse()
分别对每个元素进行操作的语句。我们可以通过对其进行向量化并使其同时对所有传递的元素进行操作来提高速度:
trunc = function(x, tau){
inds <- abs(x) > tau
x[inds] <- tau[inds] * sign(x[inds])
x
}
Run Code Online (Sandbox Code Playgroud)
外部产品
这里缓慢的部分是逐行执行操作。您的案例的行数多于列数。诀窍 - 通过对列进行操作获得相同的结果。
res = matrix(0, ncol(data), ncol(data))
tau2 = tau * 2
for(i in 1:ncol(data)) {
inds = i:ncol(data)
res[i,inds] = colSums(trunc(data[,inds,drop=FALSE] * data[,i], tau2[rep(i,nrow(data)),inds]))
}
res[lower.tri(res)] = t(res)[lower.tri(res)]
res = res / 2
Run Code Online (Sandbox Code Playgroud)
简短说明
想想结果是如何实现的。最终矩阵的行数和列数与数据中的列数相同。对于每一行,您都会进行外积,然后对这些积求和。
现在,考虑一下生成结果第一个元素的所有操作。它采用第一列的第一个元素,对其进行平方,然后对第一列的第二个元素进行平方,依此类推,对所有平方应用截断运算并将它们相加。
因此,我们可以通过执行以下操作来达到相同的结果:
res <- sum(trunc_operator(data[,1] * data[,1] / 2, tau))
Run Code Online (Sandbox Code Playgroud)
接下来,考虑如何在矩阵中计算第二个元素。如果是第一行,则它采用第一行中的第一个元素与第二个元素的乘积,加上第二行的第一个元素乘以第二行的第二个元素,等等,然后再次应用截断运算符并对结果。我们可以通过以下方式得到:
res <- sum(trunc_operator(data[,1] * data[,2] / 2, tau))
Run Code Online (Sandbox Code Playgroud)
但还有一个技巧。我们可以通过与整个矩阵相乘来一次获得每列的第一个元素:
res <- colSums(trunc_operator(data[,1] * data / 2, tau))
Run Code Online (Sandbox Code Playgroud)
我们看到的是,我们可以通过迭代列而不是行来构造矩阵。由于列数少于行数,因此速度应该更快。
但还有一个技巧。我们注意到所得矩阵始终是对称的。因此,在每次迭代之后,我们可以丢弃已计算列的结果,只存储上对角线的结果。然后最后恢复完整的矩阵。
对所有 3 个建议解决方案进行基准测试:
microbenchmark::microbenchmark(
original = {
res = matrix(0, ncol(data), ncol(data))
for(i in 1:nrow(data)) {
res = res + mat_prod_pairwise_trunc(data[i,], data[i,], tau = tau)
}
res
},
newProd = {
tau2 = tau * 2
res = matrix(0, d, d)
for (i in seq_len(nrow(dat)))
res = res + newProd(data[i, ], tau = tau2)
res = res / 2
res
},
byCol = {
res = matrix(0, ncol(data), ncol(data))
tau2 = tau * 2
for(i in 1:ncol(data)) {
inds = i:ncol(data)
res[i,inds] = colSums(trunc(data[,inds,drop=FALSE] * data[,i], tau2[rep(i,nrow(data)),inds]))
}
res[lower.tri(res)] = t(res)[lower.tri(res)]
res = res / 2
res
},
times = 2,
unit = "s",
check = "equal")
Unit: seconds
expr min lq mean median uq max neval cld
original 47.822215 47.822215 49.94679 49.94679 52.07136 52.07136 2 a
newProd 12.371246 12.371246 12.89065 12.89065 13.41006 13.41006 2 b
byCol 9.778319 9.778319 10.11568 10.11568 10.45303 10.45303 2 b
Run Code Online (Sandbox Code Playgroud)
这是一个Rcpp方法。在提供的数据集上花费不到一秒的时间:
Rcpp::cppFunction(
"std::vector<std::vector<double>>
funCpp(std::vector<std::vector<double>> &data,
std::vector<std::vector<double>> &tau){
int p = data.size(), n = data[0].size();
std::vector<std::vector<double>> results(p, std::vector<double>(p));
for(int i = 0; i < p; i++){
for(int j = i; j < p; j++){
double res = 0;
double _tau = tau[i][j];
for(int k = 0; k < n; k++){
double ans = data[i][k] * data[j][k]/2;
double sgn = (0 < ans) - (ans < 0);
res += ans * sgn > _tau ? _tau * sgn : ans;
}
results[i][j] = res;
results[j][i] = res;
}
}
return results;
}")
funR <- function(data, tau){
out <- funCpp(data.frame(data), data.frame(tau))
simplify2array(out)
}
s <- funR(data, tau)
all.equal(s, original)
[1] TRUE
Run Code Online (Sandbox Code Playgroud)
最终结果 的s
计算时间不到一秒。它比原始函数快约 25 倍。我无法使用microbenchmark
上面提供的答案,因为第一个答案不会产生与原始问题相同的结果。
您正在评估x %*% t(x)
而不是tcrossprod(x, NULL)
. 它们分别调用 BLAS 例程DGEMM
和DSYRK
,其中仅DSYRK
利用对称性。
所以你的第一步应该是tcrossprod(x, NULL)
在适当的地方打电话,而不是x %*% t(x)
甚至tcrossprod(x, x)
。
如果生成的代码对于您的目的来说仍然太慢,请根据您的硬件尝试不同的 BLAS 实现。但请注意,如果向量x
包含特殊值(即NaN
、NA_real_
、Inf
或 ) ,则不会使用外部 BLAS 实现-Inf
。
我在下面重新输入了您的示例,优化了一些设置代码并在样式和格式方面采取了一些自由(部分是为了在 Emacs 中突出显示语法)。最后有一个针对您的代码的基准。
set.seed(0)
n <- 200L
d <- 200L
df <- 4.1
mkDat <- function(x) {
n <- nrow(x)
if (n < 2L)
return(matrix(0, 0L, ncol(x)))
i1 <- rep.int(1L:(n - 1L), (n - 1L):1L)
i2 <- sequence.default((n - 1L):1L, 2L:n, 1L)
x[i1, ] - x[i2, ]
}
dat <- mkDat(matrix(rt(n * d, df), n, d))
mkTau <- function(df, delta, n, d) {
off <- (df/(df - 2))^2
on <- 0.5 * (3 * df^2)/(df - 2) * (df - 4) + 1.5 * off
sqrt(diag(on - off, d, d) + off) *
sqrt(floor(n / 2)/(2 * log(d) - log(delta)))
}
tau <- mkTau(df, 1/n, n, d)
oldProd <- function(x, y, tau) {
r <- (x %*% t(y)) / 2
ifelse(abs(r) > tau, tau * sign(r), r)
}
newProd <- function(x, y = NULL, tau) {
r <- tcrossprod(x, y)
if (length(i <- which(abs(r) > tau)))
r[i] <- tau[i] * sign(r[i])
r
}
dat.invsqrt2 <- dat / sqrt(2)
microbenchmark::microbenchmark(
oldLoop = {
res <- matrix(0, d, d)
for (i in seq_len(nrow(dat)))
res <- res + oldProd(dat[i, ], dat[i, ], tau = tau)
res
},
newLoop = {
res <- matrix(0, d, d)
for (i in seq_len(nrow(dat)))
res <- res + newProd(dat.invsqrt2[i, ], tau = tau)
res
},
times = 10L,
unit = "s",
check = "equal")
Run Code Online (Sandbox Code Playgroud)
Unit: seconds
expr min lq mean median uq max neval
oldLoop 15.422133 15.509987 15.626912 15.564334 15.681810 16.100875 10
newLoop 3.928236 4.021573 4.040398 4.053979 4.074112 4.087461 10
Run Code Online (Sandbox Code Playgroud)