989*_*989 5 parallel-processing r parallel-foreach doparallel
我正在分别具有 4 个和 8 个物理和逻辑内核的 PC(OS Linux)上运行以下代码(从doParallel 的 Vignettes 中提取)。
运行代码iter=1e+6或更少,一切都很好,我可以从 CPU 使用率中看到所有内核都用于此计算。然而,随着迭代次数的增多(例如iter=4e+6),在这种情况下并行计算似乎不起作用。当我还监视 CPU 使用率时,只有一个核心参与计算(100% 使用率)。
示例 1
require("doParallel")
require("foreach")
registerDoParallel(cores=8)
x <- iris[which(iris[,5] != "setosa"), c(1,5)]
iter=4e+6
ptime <- system.time({
r <- foreach(i=1:iter, .combine=rbind) %dopar% {
ind <- sample(100, 100, replace=TRUE)
result1 <- glm(x[ind,2]~x[ind,1], family=binomial(logit))
coefficients(result1)
}
})[3]
Run Code Online (Sandbox Code Playgroud)
你知道可能是什么原因吗?记忆可能是原因吗?
我四处搜索,发现这与我的问题有关,但重点是我没有出现任何错误,而且 OP 似乎通过在内部提供必要的包来提出解决方案foreach循环。但是可以看出,我的循环中没有使用任何包。
更新1
我的问题还是没有解决。根据我的实验,我不认为记忆可能是原因。我在运行以下简单并行(在所有 8 个逻辑内核上)迭代的系统上有 8GB 内存:
例2
require("doParallel")
require("foreach")
registerDoParallel(cores=8)
iter=4e+6
ptime <- system.time({
r <- foreach(i=1:iter, .combine=rbind) %dopar% {
i
}
})[3]
Run Code Online (Sandbox Code Playgroud)
运行此代码没有问题,但是当我监视 CPU 使用率时,只有一个内核(8 个内核)是 100%。
更新2
至于示例2,@SteveWeston(感谢您指出这一点)表示(在评论中):“您更新中的示例正在遭受小任务的困扰。只有主人有任何真正的工作要做,包括发送任务和处理结果。这与原始示例的问题根本不同,原始示例确实在较少次数的迭代中使用了多个内核。”
但是,示例 1仍未解决。当我运行它并使用 监视进程时htop,以下是更详细的情况:
让我们p1通过.name 命名所有 8 个创建的进程p8。状态(列S在htop了)p1的R意思是它的运行和保持不变。然而,对于p2高达p8,有些分钟后,状态变为D(即不间断睡眠),也有人分钟后,再次变为Z(即终止,但不会被其父收获)。你知道为什么会这样吗?
我认为你的内存不足。这是该示例的修改版本,当您有许多任务时,它应该会更好地工作。它使用 doSNOW 而不是 doParallel,因为 doSNOW 允许您使用 combine 函数处理工作人员返回的结果。此示例将这些结果写入文件以使用更少的内存,但是它在最后使用“.final”函数将结果读回内存,但如果您没有足够的内存,则可以跳过该步骤。
library(doSNOW)
library(tcltk)
nw <- 4 # number of workers
cl <- makeSOCKcluster(nw)
registerDoSNOW(cl)
x <- iris[which(iris[,5] != 'setosa'), c(1,5)]
niter <- 15e+6
chunksize <- 4000 # may require tuning for your machine
maxcomb <- nw + 1 # this count includes fobj argument
totaltasks <- ceiling(niter / chunksize)
comb <- function(fobj, ...) {
for(r in list(...))
writeBin(r, fobj)
fobj
}
final <- function(fobj) {
close(fobj)
t(matrix(readBin('temp.bin', what='double', n=niter*2), nrow=2))
}
mkprogress <- function(total) {
pb <- tkProgressBar(max=total,
label=sprintf('total tasks: %d', total))
function(n, tag) {
setTkProgressBar(pb, n,
label=sprintf('last completed task: %d of %d', tag, total))
}
}
opts <- list(progress=mkprogress(totaltasks))
resultFile <- file('temp.bin', open='wb')
r <-
foreach(n=idiv(niter, chunkSize=chunksize), .combine='comb',
.maxcombine=maxcomb, .init=resultFile, .final=final,
.inorder=FALSE, .options.snow=opts) %dopar% {
do.call('c', lapply(seq_len(n), function(i) {
ind <- sample(100, 100, replace=TRUE)
result1 <- glm(x[ind,2]~x[ind,1], family=binomial(logit))
coefficients(result1)
}))
}
Run Code Online (Sandbox Code Playgroud)
我包含了一个进度条,因为这个例子需要几个小时来执行。
请注意,此示例还使用包中的idiv函数iterators来增加每个任务的工作量。这种技术称为分块,通常可以提高并行性能。但是,使用会idiv弄乱任务索引,因为变量i现在是每个任务的索引而不是全局索引。对于全局索引,您可以编写一个自定义迭代器来包装idiv:
idivix <- function(n, chunkSize) {
i <- 1
it <- idiv(n, chunkSize=chunkSize)
nextEl <- function() {
m <- nextElem(it) # may throw 'StopIterator'
value <- list(i=i, m=m)
i <<- i + m
value
}
obj <- list(nextElem=nextEl)
class(obj) <- c('abstractiter', 'iter')
obj
}
Run Code Online (Sandbox Code Playgroud)
此迭代器发出的值是列表,每个列表包含一个起始索引和一个计数。这是一个使用此自定义迭代器的简单 foreach 循环:
r <-
foreach(a=idivix(10, chunkSize=3), .combine='c') %dopar% {
do.call('c', lapply(seq(a$i, length.out=a$m), function(i) {
i
}))
}
Run Code Online (Sandbox Code Playgroud)
当然,如果任务的计算强度足够大,您可能不需要分块,可以使用原始示例中的简单 foreach 循环。