doParallel(包)foreach 不适用于 R 中的大迭代

989*_*989 5 parallel-processing r parallel-foreach doparallel

我正在分别具有 4 个和 8 个物理和逻辑内核的 PC(OS Linux)上运行以下代码(从doParallel 的 Vignettes 中提取)。

运行代码iter=1e+6或更少,一切都很好,我可以从 CPU 使用率中看到所有内核都用于此计算。然而,随着迭代次数的增多(例如iter=4e+6),在这种情况下并行计算似乎不起作用。当我还监视 CPU 使用率时,只有一个核心参与计算(100% 使用率)。

示例 1

require("doParallel")
require("foreach")
registerDoParallel(cores=8)
x <- iris[which(iris[,5] != "setosa"), c(1,5)]
iter=4e+6
ptime <- system.time({
    r <- foreach(i=1:iter, .combine=rbind) %dopar% {
        ind <- sample(100, 100, replace=TRUE)
        result1 <- glm(x[ind,2]~x[ind,1], family=binomial(logit))
        coefficients(result1)
    }
})[3]
Run Code Online (Sandbox Code Playgroud)

你知道可能是什么原因吗?记忆可能是原因吗?

我四处搜索,发现与我的问题有关,但重点是我没有出现任何错误,而且 OP 似乎通过在内部提供必要的包来提出解决方案foreach循环。但是可以看出,我的循环中没有使用任何包。

更新1

我的问题还是没有解决。根据我的实验,我不认为记忆可能是原因。我在运行以下简单并行(在所有 8 个逻辑内核上)迭代的系统上有 8GB 内存:

例2

require("doParallel")
require("foreach")

registerDoParallel(cores=8)
iter=4e+6
ptime <- system.time({
    r <- foreach(i=1:iter, .combine=rbind) %dopar% {
        i
    }
})[3]
Run Code Online (Sandbox Code Playgroud)

运行此代码没有问题,但是当我监视 CPU 使用率时,只有一个内核(8 个内核)是 100%。

更新2

至于示例2,@SteveWeston(感谢您指出这一点)表示(在评论中):“您更新中的示例正在遭受小任务的困扰。只有主人有任何真正的工作要做,包括发送任务和处理结果。这与原始示例的问题根本不同,原始示例确实在较少次数的迭代中使用了多个内核。”

但是,示例 1仍未解决。当我运行它并使用 监视进程时htop,以下是更详细的情况:

让我们p1通过.name 命名所有 8 个创建的进程p8。状态(列Shtop了)p1R意思是它的运行和保持不变。然而,对于p2高达p8,有些分钟后,状态变为D(即不间断睡眠),也有人分钟后,再次变为Z(即终止,但不会被其父收获)。你知道为什么会这样吗?

Ste*_*ton 5

我认为你的内存不足。这是该示例的修改版本,当您有许多任务时,它应该会更好地工作。它使用 doSNOW 而不是 doParallel,因为 doSNOW 允许您使用 combine 函数处理工作人员返回的结果。此示例将这些结果写入文件以使用更少的内存,但是它在最后使用“.final”函数将结果读回内存,但如果您没有足够的内存,则可以跳过该步骤。

library(doSNOW)
library(tcltk)
nw <- 4  # number of workers
cl <- makeSOCKcluster(nw)
registerDoSNOW(cl)

x <- iris[which(iris[,5] != 'setosa'), c(1,5)]
niter <- 15e+6
chunksize <- 4000  # may require tuning for your machine
maxcomb <- nw + 1  # this count includes fobj argument
totaltasks <- ceiling(niter / chunksize)

comb <- function(fobj, ...) {
  for(r in list(...))
    writeBin(r, fobj)
  fobj
}

final <- function(fobj) {
  close(fobj)
  t(matrix(readBin('temp.bin', what='double', n=niter*2), nrow=2))
}

mkprogress <- function(total) {
  pb <- tkProgressBar(max=total,
                      label=sprintf('total tasks: %d', total))
  function(n, tag) {
    setTkProgressBar(pb, n,
      label=sprintf('last completed task: %d of %d', tag, total))
  }
}
opts <- list(progress=mkprogress(totaltasks))
resultFile <- file('temp.bin', open='wb')

r <-
  foreach(n=idiv(niter, chunkSize=chunksize), .combine='comb',
          .maxcombine=maxcomb, .init=resultFile, .final=final,
          .inorder=FALSE, .options.snow=opts) %dopar% {
    do.call('c', lapply(seq_len(n), function(i) {
      ind <- sample(100, 100, replace=TRUE)
      result1 <- glm(x[ind,2]~x[ind,1], family=binomial(logit))
      coefficients(result1)
    }))
  }
Run Code Online (Sandbox Code Playgroud)

我包含了一个进度条,因为这个例子需要几个小时来执行。

请注意,此示例还使用包中的idiv函数iterators来增加每个任务的工作量。这种技术称为分块,通常可以提高并行性能。但是,使用会idiv弄乱任务索引,因为变量i现在是每个任务的索引而不是全局索引。对于全局索引,您可以编写一个自定义迭代器来包装idiv

idivix <- function(n, chunkSize) {
  i <- 1
  it <- idiv(n, chunkSize=chunkSize)
  nextEl <- function() {
    m <- nextElem(it)  # may throw 'StopIterator'
    value <- list(i=i, m=m)
    i <<- i + m
    value
  }
  obj <- list(nextElem=nextEl)
  class(obj) <- c('abstractiter', 'iter')
  obj
}
Run Code Online (Sandbox Code Playgroud)

此迭代器发出的值是列表,每个列表包含一个起始索引和一个计数。这是一个使用此自定义迭代器的简单 foreach 循环:

r <- 
  foreach(a=idivix(10, chunkSize=3), .combine='c') %dopar% {
    do.call('c', lapply(seq(a$i, length.out=a$m), function(i) {
      i
    }))
  }
Run Code Online (Sandbox Code Playgroud)

当然,如果任务的计算强度足够大,您可能不需要分块,可以使用原始示例中的简单 foreach 循环。