data.table和并行计算

sta*_*ant 36 parallel-processing r data.table

在这篇文章之后:R中的multicore和data.table,我想知道在使用data.table时是否有办法使用所有核心,通常按组进行计算可以并行化.似乎plyr允许通过设计进行此类操作.

Mat*_*wle 45

首先要检查的是data.tableFAQ 3.1第2点已经陷入困境:

仅为最大的组进行一次内存分配,然后将该内存重新用于其他组.收集的垃圾很少.

这就是data.table分组快速的原因之一.但这种方法不适合并行化.并行化意味着将数据复制到其他线程,而不是花费时间.但是,我的理解是,data.table分组通常比快plyr.parallel上反正.它取决于每个组的任务的计算时间,以及该计算时间是否可以轻松减少.移动数据通常占主导地位(在对大型数据任务进行1或3次基准测试时).

更常见的是,到目前为止,实际上有一些问题在于j表达方式[.data.table.例如,最近我们看到data.table分组表现不佳,但罪魁祸首证明是min(POSIXct)(聚合R超过80K的唯一ID).避免这种问题产生超过50倍的速度.

因此,口头禅是: Rprof,Rprof,Rprof.

此外,来自同一FAQ的第1点可能很重要:

只有该列被分组,其他19被忽略,因为data.table检查j表达式并意识到它不使用其他列.

所以,data.table真的不遵循split-apply-combine范式.它的工作方式不同 split-apply-combine有助于并行化,但它实际上不能扩展到大数据.

另请参阅data.table介绍插图中的脚注3:

我们想知道有多少人正在将并行技术部署到矢量扫描的代码中

那是试图说"确定,并行速度明显更快,但使用高效算法需要多长时间?".

但是如果您已经分析(使用Rprof),并且每个组的任务确实是计算密集型的,那么数据表帮助中的3个帖子(包括单词"multicore")可能会有所帮助:

datatable-help上的多核帖子

当然,有许多任务可以在data.table中实现并行化,并且有一种方法可以做到.但它还没有完成,因为通常其他因素都会受到影响,因此它的优先级较低.如果您可以使用基准和Rprof结果发布可重现的虚拟数据,这将有助于提高优先级.

  • 注意[这里](http://stackoverflow.com/a/9808657/1385941)提出的观点可能很有用,尤其是拆分发生在单个核心上,然后该函数并行应用于数据 - 一些严重的开销! (2认同)
  • @ user1617979请在R提示符下键入`?Rprof`并尝试一下。 (2认同)

Ale*_*x W 6

我已经根据@matt dowle先前的Rprof,Rprof,Rprof的口号进行了一些测试.

我发现并行化的决定取决于上下文; 但可能很重要.根据测试操作(例如foo下面,可以自定义)和使用的核心数量(我尝试8和24),我得到不同的结果.

结果如下:

  1. 使用8个内核,我看到这个示例在并行化方面21%的改进
  2. 使用24核,我看到14%的改进.

我还看一些真实世界(不可共享)的数据/操作,这些数据/操作显示了24个内核的更大(33%25%两个不同的测试)改进.编辑2018年5月一组新的实际案例显示,1000个并行操作的改进率接近85%.

R> sessionInfo() # 24 core machine:
R version 3.3.2 (2016-10-31)
Platform: x86_64-pc-linux-gnu (64-bit)
Running under: CentOS Linux 7 (Core)

attached base packages:
[1] parallel  stats     graphics  grDevices utils     datasets  methods
[8] base

other attached packages:
[1] microbenchmark_1.4-2.1 stringi_1.1.2          data.table_1.10.4

R> sessionInfo() # 8 core machine:
R version 3.3.2 (2016-10-31)
Platform: x86_64-apple-darwin13.4.0 (64-bit)
Running under: macOS Sierra 10.12.4

attached base packages:
[1] parallel  stats     graphics  grDevices utils     datasets  methods   base     

other attached packages:
[1] microbenchmark_1.4-2.1 stringi_1.1.5          data.table_1.10.4     
Run Code Online (Sandbox Code Playgroud)

示例如下:

library(data.table)
library(stringi)
library(microbenchmark)

set.seed(7623452L)
my_grps <- stringi::stri_rand_strings(n= 5000, length= 10)

my_mat <- matrix(rnorm(1e5), ncol= 20)
dt <- data.table(grps= rep(my_grps, each= 20), my_mat)

foo <- function(dt) {
  dt2 <- dt ## needed for .SD lock
  nr <- nrow(dt2)

  idx <- sample.int(nr, 1, replace=FALSE)

  dt2[idx,][, `:=` (
    new_var1= V1 / V2,
    new_var2= V4 * V3 / V10,
    new_var3= sum(V12),
    new_var4= ifelse(V10 > 0, V11 / V13, 1),
    new_var5= ifelse(V9 < 0, V8 / V18, 1)
  )]


  return(dt2[idx,])
}

split_df <- function(d, var) {
  base::split(d, get(var, as.environment(d)))
}

foo2 <- function(dt) {
  dt2 <- split_df(dt, "grps")

  require(parallel)
  cl <- parallel::makeCluster(min(nrow(dt), parallel::detectCores()))
  clusterExport(cl, varlist= "foo")
  clusterExport(cl, varlist= "dt2", envir = environment())
  clusterEvalQ(cl, library("data.table"))

  dt2 <- parallel::parLapply(cl, X= dt2, fun= foo)

  parallel::stopCluster(cl)
  return(rbindlist(dt2))
}

print(parallel::detectCores()) # 8

microbenchmark(
  serial= dt[,foo(.SD), by= "grps"],
  parallel= foo2(dt),
  times= 10L
)

Unit: seconds
     expr      min       lq     mean   median       uq      max neval cld
   serial 6.962188 7.312666 8.433159 8.758493 9.287294 9.605387    10   b
 parallel 6.563674 6.648749 6.976669 6.937556 7.102689 7.654257    10  a 

print(parallel::detectCores()) # 24

Unit: seconds
     expr       min        lq     mean   median       uq      max neval cld
   serial  9.014247  9.804112 12.17843 13.17508 13.56914 14.13133    10   a
 parallel 10.732106 10.957608 11.17652 11.06654 11.30386 12.28353    10   a
Run Code Online (Sandbox Code Playgroud)

剖析:

我们可以使用此答案对@matt dowle对分析的原始评论提供更直接的响应.

因此,我们确实看到大部分计算时间都是由处理base而不是处理data.table.data.table正如预期的那样,操作本身非常快.虽然有些人可能认为这证明内部不需要并行性data.table,但我认为这个工作流程/操作集并不是非典型的.也就是说,我强烈怀疑大多数大型data.table聚合涉及大量非data.table代码; 并且这与交互式使用与开发/生产使用相关.因此,我得出结论,并行性data.table对于大型聚合来说是有价值的.

library(profr)

prof_list <- replicate(100, profr::profr(dt[,foo(.SD), by= "grps"], interval = 0.002),
                       simplify = FALSE)

pkg_timing <- fun_timing <- vector("list", length= 100)
for (i in 1:100) {
  fun_timing[[i]] <- tapply(prof_list[[i]]$time, paste(prof_list[[i]]$source, prof_list[[i]]$f, sep= "::"), sum)
  pkg_timing[[i]] <- tapply(prof_list[[i]]$time, prof_list[[i]]$source, sum)
}

sort(sapply(fun_timing, sum)) #  no large outliers

fun_timing2 <- rbindlist(lapply(fun_timing, function(x) {
  ret <- data.table(fun= names(x), time= x)
  ret[, pct_time := time / sum(time)]
  return(ret)
}))

pkg_timing2 <- rbindlist(lapply(pkg_timing, function(x) {
  ret <- data.table(pkg= names(x), time= x)
  ret[, pct_time := time / sum(time)]
  return(ret)
}))

fun_timing2[, .(total_time= sum(time),
                avg_time= mean(time),
                avg_pct= round(mean(pct_time), 4)), by= "fun"][
  order(avg_time, decreasing = TRUE),][1:10,]

pkg_timing2[, .(total_time= sum(time),
                avg_time= mean(time),
                avg_pct= round(mean(pct_time), 4)), by= "pkg"][
  order(avg_time, decreasing = TRUE),]
Run Code Online (Sandbox Code Playgroud)

结果:

                      fun total_time avg_time avg_pct
 1:               base::[    670.362  6.70362  0.2694
 2:      NA::[.data.table    667.350  6.67350  0.2682
 3:       .GlobalEnv::foo    335.784  3.35784  0.1349
 4:              base::[[    163.044  1.63044  0.0655
 5:   base::[[.data.frame    133.790  1.33790  0.0537
 6:            base::%in%    120.512  1.20512  0.0484
 7:        base::sys.call     86.846  0.86846  0.0348
 8: NA::replace_dot_alias     27.824  0.27824  0.0112
 9:           base::which     23.536  0.23536  0.0095
10:          base::sapply     22.080  0.22080  0.0089

          pkg total_time avg_time avg_pct
1:       base   1397.770 13.97770  0.7938
2: .GlobalEnv    335.784  3.35784  0.1908
3: data.table     27.262  0.27262  0.0155
Run Code Online (Sandbox Code Playgroud)

github/data.table中交叉发布