Dna*_*iel 15 r mclapply data.table
我有一个很大的代码,聚合步骤是速度方面的当前瓶颈.
在我的代码中,我想加快数据分组步骤的速度.我的数据的SNOTE(简单非平凡示例)如下所示:
library(data.table)
a = sample(1:10000000, 50000000, replace = TRUE)
b = sample(c("3m","2m2d2m","3m2d1i3s2d","5m","4m","9m","1m"), 50000000, replace = TRUE)
d = sample(c("3m","2m2d2m","3m2d1i3s2d","5m","4m","9m","1m"), 50000000, replace = TRUE)
e = a
dt = data.table(a = a, b = b, d = d, e = e)
system.time(c.dt <- dt[,list(b = paste(b, collapse=""), d = paste(d, collapse=""), e = e[1], by=a)])
user system elapsed
60.107 3.143 63.534
Run Code Online (Sandbox Code Playgroud)
这对于如此大的数据示例来说非常快,但在我的情况下,我仍在寻找进一步的加速.在我的情况下,我有多个核心,所以我几乎肯定必须有一种方法来使用这种计算能力.
我愿意将我的数据类型更改为data.frame或idata.frame对象(理论上,idata.frame应该比data.frames更快).
我做了一些研究,看起来plyr软件包有一些并行功能可能会有所帮助,但我仍然在努力研究如何为我正在尝试的分组做这件事.在另一篇SO帖子中,他们讨论了一些这些想法.由于它使用了foreach函数,我仍然不确定我在这个并行化方面取得了多少成就.根据我的经验,foreach函数对于数百万的快速操作来说并不是一个好主意,因为核心之间的通信工作最终会减慢并行化工作的速度.
Dan*_*ega 12
你可以并行聚合data.table吗?是.
这值得么?不.这是前一个答案未能突出的关键点.
正如Matt Dowle在data.table和并行计算中所说,在并行运行操作时,需要在分发之前制作副本("块").这会减慢速度.在某些情况下,当您无法使用data.table(例如运行许多线性回归)时,值得在核心之间拆分任务.但不是聚合 - 至少在data.table涉及时.
简而言之(并且直到另外证明),聚合使用data.table并停止担心潜在的速度增加使用doMC.data.table与聚合时可用的任何其他东西相比,它已经非常快速 - 即使它不是多核的!
以下是你可以自己比较运行一些基准data.table使用内部聚集by带foreach和mclapply.结果首先列出.
#-----------------------------------------------
# TL;DR FINAL RESULTS (Best to Worst)
# 3 replications, N = 10000:
# (1) 0.007 -- data.table using `by`
# (2) 3.548 -- mclapply with rbindlist
# (3) 5.557 -- foreach with rbindlist
# (4) 5.959 -- foreach with .combine = "rbind"
# (5) 14.029 -- lapply
# ----------------------------------------------
library(data.table)
## And used the following to create the dt
N <- 1e4
set.seed(1)
a = sample(1:N, N*2, replace = TRUE)
b = sample(c("3m","2m2d2m","3m2d1i3s2d","5m","4m","9m","1m"), N*2, replace = TRUE)
d = sample(c("3m","2m2d2m","3m2d1i3s2d","5m","4m","9m","1m"), N*2, replace = TRUE)
e = a
dt = data.table(a = a, b = b, d = d, e = e, key="a")
setkey(dt, "a")
# TEST AGGREGATION WITHOUT PARALLELIZATION ---------------------------
## using data.tables `by` to aggregate
round(rowMeans(replicate(3, system.time({
dt[,list(b = paste(b, collapse=""), d = paste(d, collapse=""), e = e[1], by=a)]
}))), 3)
# [1] 0.007 elapsed for N == 10,000, length(unique(dt[["a"]])) == 8617
## using `lapply`
round(rowMeans(replicate(3, system.time({
results <- lapply(unique(dt[["a"]]), function(x) {
dt[.(x), list(b = paste(b, collapse=""), d = paste(d, collapse=""), e = e[1])]
})
rbindlist(results)
}))), 3)
# [1] 14.029 elapsed for N == 10,000
# USING `mclapply` FORKING ---------------------------------
## use mclapply
round(rowMeans(replicate(3, system.time({
results <- mclapply(unique(dt[["a"]]),
function(x) {
dt[.(x), list(b = paste(b, collapse=""), d = paste(d, collapse=""), e = e[[1]])]
}, mc.cores=4)
rbindlist(results)
}))), 3)
# [1] 3.548 elapsed for N == 10,000
# PARALLELIZATION USING `doMC` PACKAGE ---------------------------------
library(doMC)
mc = 4
registerDoMC(cores=mc)
getDoParWorkers()
# [1] 4
## (option a) by Ricardo Saporta
round(rowMeans(replicate(3, system.time({
foreach(x=unique(dt[["a"]]), .combine="rbind", .inorder=FALSE) %dopar%
dt[.(x) ,list(b = paste(b, collapse=""), d = paste(d, collapse=""), e = e[[1]])]
}))), 3)
# [1] 5.959 elapsed for N == 10,000
## (option b) by Ricardo Saporta
round(rowMeans(replicate(3, system.time({
results <-
foreach(x=unique(dt[["a"]])) %dopar%
dt[.(x) ,list(b = paste(b, collapse=""), d = paste(d, collapse=""), e = e[[1]])]
rbindlist(results)
}))), 3)
# [1] 5.557 elapsed for N == 10,000
registerDoSEQ()
getDoParWorkers()
# [1] 1
Run Code Online (Sandbox Code Playgroud)
如果您有多个可用核心,为什么不利用以下事实:您可以使用其密钥快速过滤和分组data.table中的行:
library(doMC)
registerDoMC(cores=4)
setkey(dt, "a")
finalRowOrderMatters = FALSE # FALSE can be faster
foreach(x=unique(dt[["a"]]), .combine="rbind", .inorder=finalRowOrderMatters) %dopar%
dt[.(x) ,list(b = paste(b, collapse=""), d = paste(d, collapse=""), e = e[[1]])]
Run Code Online (Sandbox Code Playgroud)
请注意,如果唯一组的数量(即length(unique(a)))相对较小,则删除.combine参数会更快,将结果返回到列表中,然后调用rbindlist结果.在我对两个内核和8GB内存的测试中,阈值大约为9,000个唯一值.这是我用来测量的基准:
# (otion a)
round(rowMeans(replicate(3, system.time({
# ------- #
foreach(x=unique(dt[["a"]]), .combine="rbind", .inorder=FALSE) %dopar%
dt[.(x) ,list(b = paste(b, collapse=""), d = paste(d, collapse=""), e = e[[1]])]
# ------- #
}))), 3)
# [1] 1.243 elapsed for N == 1,000
# [1] 11.540 elapsed for N == 10,000, length(unique(dt[["a"]])) == 8617
# [1] 57.404 elapsed for N == 50,000
# (otion b)
round(rowMeans(replicate(3, system.time({
# ------- #
results <-
foreach(x=unique(dt[["a"]])) %dopar%
dt[.(x) ,list(b = paste(b, collapse=""), d = paste(d, collapse=""), e = e[[1]])]
rbindlist(results)
# ------- #
}))), 3)
# [1] 1.117 elapsed for N == 1,000
# [1] 10.567 elapsed for N == 10,000, length(unique(dt[["a"]])) == 8617
# [1] 76.613 elapsed for N == 50,000
## And used the following to create the dt
N <- 5e4
set.seed(1)
a = sample(1:N, N*2, replace = TRUE)
b = sample(c("3m","2m2d2m","3m2d1i3s2d","5m","4m","9m","1m"), N*2, replace = TRUE)
d = sample(c("3m","2m2d2m","3m2d1i3s2d","5m","4m","9m","1m"), N*2, replace = TRUE)
e = a
dt = data.table(a = a, b = b, d = d, e = e, key="a")
Run Code Online (Sandbox Code Playgroud)