sta*_*ant 36 parallel-processing r data.table
在这篇文章之后:R中的multicore和data.table,我想知道在使用data.table时是否有办法使用所有核心,通常按组进行计算可以并行化.似乎plyr
允许通过设计进行此类操作.
Mat*_*wle 45
首先要检查的是data.table
FAQ 3.1第2点已经陷入困境:
仅为最大的组进行一次内存分配,然后将该内存重新用于其他组.收集的垃圾很少.
这就是data.table分组快速的原因之一.但这种方法不适合并行化.并行化意味着将数据复制到其他线程,而不是花费时间.但是,我的理解是,data.table
分组通常比快plyr
与.parallel
上反正.它取决于每个组的任务的计算时间,以及该计算时间是否可以轻松减少.移动数据通常占主导地位(在对大型数据任务进行1或3次基准测试时).
更常见的是,到目前为止,实际上有一些问题在于j
表达方式[.data.table
.例如,最近我们看到data.table
分组表现不佳,但罪魁祸首证明是min(POSIXct)
(聚合R超过80K的唯一ID).避免这种问题产生超过50倍的速度.
因此,口头禅是: Rprof
,Rprof
,Rprof
.
此外,来自同一FAQ的第1点可能很重要:
只有该列被分组,其他19被忽略,因为data.table检查j表达式并意识到它不使用其他列.
所以,data.table
真的不遵循split-apply-combine范式.它的工作方式不同 split-apply-combine有助于并行化,但它实际上不能扩展到大数据.
另请参阅data.table介绍插图中的脚注3:
我们想知道有多少人正在将并行技术部署到矢量扫描的代码中
那是试图说"确定,并行速度明显更快,但使用高效算法需要多长时间?".
但是如果您已经分析(使用Rprof
),并且每个组的任务确实是计算密集型的,那么数据表帮助中的3个帖子(包括单词"multicore")可能会有所帮助:
当然,有许多任务可以在data.table中实现并行化,并且有一种方法可以做到.但它还没有完成,因为通常其他因素都会受到影响,因此它的优先级较低.如果您可以使用基准和Rprof结果发布可重现的虚拟数据,这将有助于提高优先级.
我已经根据@matt dowle先前的Rprof,Rprof,Rprof的口号进行了一些测试.
我发现并行化的决定取决于上下文; 但可能很重要.根据测试操作(例如foo
下面,可以自定义)和使用的核心数量(我尝试8和24),我得到不同的结果.
我还看一些真实世界(不可共享)的数据/操作,这些数据/操作显示了24个内核的更大(33%
或25%
两个不同的测试)改进.编辑2018年5月一组新的实际案例显示,1000个并行操作的改进率接近85%.
R> sessionInfo() # 24 core machine:
R version 3.3.2 (2016-10-31)
Platform: x86_64-pc-linux-gnu (64-bit)
Running under: CentOS Linux 7 (Core)
attached base packages:
[1] parallel stats graphics grDevices utils datasets methods
[8] base
other attached packages:
[1] microbenchmark_1.4-2.1 stringi_1.1.2 data.table_1.10.4
R> sessionInfo() # 8 core machine:
R version 3.3.2 (2016-10-31)
Platform: x86_64-apple-darwin13.4.0 (64-bit)
Running under: macOS Sierra 10.12.4
attached base packages:
[1] parallel stats graphics grDevices utils datasets methods base
other attached packages:
[1] microbenchmark_1.4-2.1 stringi_1.1.5 data.table_1.10.4
Run Code Online (Sandbox Code Playgroud)
library(data.table)
library(stringi)
library(microbenchmark)
set.seed(7623452L)
my_grps <- stringi::stri_rand_strings(n= 5000, length= 10)
my_mat <- matrix(rnorm(1e5), ncol= 20)
dt <- data.table(grps= rep(my_grps, each= 20), my_mat)
foo <- function(dt) {
dt2 <- dt ## needed for .SD lock
nr <- nrow(dt2)
idx <- sample.int(nr, 1, replace=FALSE)
dt2[idx,][, `:=` (
new_var1= V1 / V2,
new_var2= V4 * V3 / V10,
new_var3= sum(V12),
new_var4= ifelse(V10 > 0, V11 / V13, 1),
new_var5= ifelse(V9 < 0, V8 / V18, 1)
)]
return(dt2[idx,])
}
split_df <- function(d, var) {
base::split(d, get(var, as.environment(d)))
}
foo2 <- function(dt) {
dt2 <- split_df(dt, "grps")
require(parallel)
cl <- parallel::makeCluster(min(nrow(dt), parallel::detectCores()))
clusterExport(cl, varlist= "foo")
clusterExport(cl, varlist= "dt2", envir = environment())
clusterEvalQ(cl, library("data.table"))
dt2 <- parallel::parLapply(cl, X= dt2, fun= foo)
parallel::stopCluster(cl)
return(rbindlist(dt2))
}
print(parallel::detectCores()) # 8
microbenchmark(
serial= dt[,foo(.SD), by= "grps"],
parallel= foo2(dt),
times= 10L
)
Unit: seconds
expr min lq mean median uq max neval cld
serial 6.962188 7.312666 8.433159 8.758493 9.287294 9.605387 10 b
parallel 6.563674 6.648749 6.976669 6.937556 7.102689 7.654257 10 a
print(parallel::detectCores()) # 24
Unit: seconds
expr min lq mean median uq max neval cld
serial 9.014247 9.804112 12.17843 13.17508 13.56914 14.13133 10 a
parallel 10.732106 10.957608 11.17652 11.06654 11.30386 12.28353 10 a
Run Code Online (Sandbox Code Playgroud)
我们可以使用此答案对@matt dowle对分析的原始评论提供更直接的响应.
因此,我们确实看到大部分计算时间都是由处理base
而不是处理data.table
.data.table
正如预期的那样,操作本身非常快.虽然有些人可能认为这证明内部不需要并行性data.table
,但我认为这个工作流程/操作集并不是非典型的.也就是说,我强烈怀疑大多数大型data.table
聚合涉及大量非data.table
代码; 并且这与交互式使用与开发/生产使用相关.因此,我得出结论,并行性data.table
对于大型聚合来说是有价值的.
library(profr)
prof_list <- replicate(100, profr::profr(dt[,foo(.SD), by= "grps"], interval = 0.002),
simplify = FALSE)
pkg_timing <- fun_timing <- vector("list", length= 100)
for (i in 1:100) {
fun_timing[[i]] <- tapply(prof_list[[i]]$time, paste(prof_list[[i]]$source, prof_list[[i]]$f, sep= "::"), sum)
pkg_timing[[i]] <- tapply(prof_list[[i]]$time, prof_list[[i]]$source, sum)
}
sort(sapply(fun_timing, sum)) # no large outliers
fun_timing2 <- rbindlist(lapply(fun_timing, function(x) {
ret <- data.table(fun= names(x), time= x)
ret[, pct_time := time / sum(time)]
return(ret)
}))
pkg_timing2 <- rbindlist(lapply(pkg_timing, function(x) {
ret <- data.table(pkg= names(x), time= x)
ret[, pct_time := time / sum(time)]
return(ret)
}))
fun_timing2[, .(total_time= sum(time),
avg_time= mean(time),
avg_pct= round(mean(pct_time), 4)), by= "fun"][
order(avg_time, decreasing = TRUE),][1:10,]
pkg_timing2[, .(total_time= sum(time),
avg_time= mean(time),
avg_pct= round(mean(pct_time), 4)), by= "pkg"][
order(avg_time, decreasing = TRUE),]
Run Code Online (Sandbox Code Playgroud)
结果:
fun total_time avg_time avg_pct
1: base::[ 670.362 6.70362 0.2694
2: NA::[.data.table 667.350 6.67350 0.2682
3: .GlobalEnv::foo 335.784 3.35784 0.1349
4: base::[[ 163.044 1.63044 0.0655
5: base::[[.data.frame 133.790 1.33790 0.0537
6: base::%in% 120.512 1.20512 0.0484
7: base::sys.call 86.846 0.86846 0.0348
8: NA::replace_dot_alias 27.824 0.27824 0.0112
9: base::which 23.536 0.23536 0.0095
10: base::sapply 22.080 0.22080 0.0089
pkg total_time avg_time avg_pct
1: base 1397.770 13.97770 0.7938
2: .GlobalEnv 335.784 3.35784 0.1908
3: data.table 27.262 0.27262 0.0155
Run Code Online (Sandbox Code Playgroud)