有没有办法打破foreach循环?

Cor*_*nes 19 parallel-processing foreach r break

我使用R包foreach()%dopar%做并行长(〜天)计算.我希望能够在其中一个产生错误的情况下停止整个计算集.但是,我还没有找到实现这一目标的方法,从文档和各种论坛我发现没有迹象表明这是可能的.特别是,break()不起作用,stop()只停止当前计算,而不是整个foreach循环.

请注意,我不能使用简单的for循环,因为最终我想使用doRNG包并行化它.

下面是我在尝试的一个简化的,可重复的版本(串行这里显示%do%,不过我用的时候有同样的问题doRNG%dopar%).请注意,实际上我想并行运行此循环的所有元素(此处为10).

library(foreach)
myfunc <- function() {
  x <- foreach(k = 1:10, .combine="cbind", .errorhandling="stop") %do% {
    cat("Element ", k, "\n")
    Sys.sleep(0.5) # just to show that stop does not cause exit from foreach
    if(is.element(k, 2:6)) {
      cat("Should stop\n")
      stop("Has stopped")
    }
    k
  }
  return(x)
}
x <- myfunc()
# stop() halts the processing of k=2:6, but it does not stop the foreach loop itself.
# x is not returned. The execution produces the error message
# Error in { : task 2 failed - "Has stopped"
Run Code Online (Sandbox Code Playgroud)

我想要实现的是整个foreach循环可以在某些条件下立即退出(这里stop()遇到时).

我发现无法实现这一目标foreach.似乎我需要一种方法向所有其他进程发送消息以使它们也停止.

如果不可能foreach,有没有人知道替代品?我也试图用这个来实现这个目标parallel::mclapply,但这也不起作用.

> sessionInfo()
R version 3.0.0 (2013-04-03)
Platform: x86_64-apple-darwin10.8.0 (64-bit)

locale:
[1] C/UTF-8/C/C/C/C

attached base packages:
[1] stats     graphics  grDevices utils     datasets  methods base

other attached packages:
[1] foreach_1.4.0

loaded via a namespace (and not attached):
[1] codetools_0.2-8 compiler_3.0.0  iterators_1.0.6
Run Code Online (Sandbox Code Playgroud)

Ste*_*ton 13

听起来你想要一个不耐烦的"停止"错误处理版本.您可以通过编写自定义组合函数来实现它,并foreach在每个结果返回后立即调用它.要做到这一点,你需要:

  • 使用支持combine即时呼叫的后端,如doMPIdoRedis
  • 不要启用 .multicombine
  • 设置.inorderFALSE
  • 设置.init为某物(如NULL)

这是一个例子:

library(foreach)
parfun <- function(errval, n) {
  abortable <- function(errfun) {
    comb <- function(x, y) {
      if (inherits(y, 'error')) {
        warning('This will leave your parallel backend in an inconsistent state')
        errfun(y)
      }
      c(x, y)
    }
    foreach(i=seq_len(n), .errorhandling='pass', .export='errval',
            .combine='comb', .inorder=FALSE, .init=NULL) %dopar% {
      if (i == errval)
        stop('testing abort')
      Sys.sleep(10)
      i
    }
  }
  callCC(abortable)
}
Run Code Online (Sandbox Code Playgroud)

请注意,我还将错误处理设置为"pass",因此foreach将使用错误对象调用combine函数.无论内部和后端使用的错误处理如何,该callCC函数都用于从foreach循环返回foreach.在这种情况下,callCC将调用该abortable函数,向其传递一个强制callCC立即返回的函数对象.通过从combine函数调用该函数,我们可以foreach在检测到错误对象时从循环中转义,并callCC返回该对象.有关?callCC更多信息,请参阅

实际上,您可以在parfun没有注册并行后端的情况下使用,并在foreach执行抛出错误的任务时验证循环是否"中断",但这可能需要一段时间,因为任务是按顺序执行的.例如,如果没有注册后端,则执行此操作需要20秒:

print(system.time(parfun(3, 4)))
Run Code Online (Sandbox Code Playgroud)

parfun并行执行时,我们需要做的不仅仅是打破foreach循环:我们还需要停止工作,否则他们将继续计算他们分配的任务.有了doMPI,工人可以停止使用mpi.abort:

library(doMPI)
cl <- startMPIcluster()
registerDoMPI(cl)
r <- parfun(getDoParWorkers(), getDoParWorkers())
if (inherits(r, 'error')) {
  cat(sprintf('Caught error: %s\n', conditionMessage(r)))
  mpi.abort(cl$comm)
}
Run Code Online (Sandbox Code Playgroud)

请注意,在循环中止后无法使用集群对象,因为事情没有被正确清理,这就是正常的"停止"错误处理不能以这种方式工作的原因.


Bea*_*eld 4

这不是对您问题的直接答案,但when()如果满足条件,使用您可以避免进入循环:

x <- foreach(k = 1:10, .combine="cbind", .errorhandling="stop") %:%
  when( !is.element(k, 2:6) ) %do%
  {
    cat("Element ", k, "\n")
    Sys.sleep(0.5)
    k
  }
Run Code Online (Sandbox Code Playgroud)

编辑:

我忘记了一些事情:我认为这是设计使然,你不能只是停止 foreach 循环。如果并行运行循环,则每一轮都是独立处理的,这意味着当您停止整个循环时,k=2无法预测进程是否已k=1终止或仍在运行。因此,使用when()条件可以得到确定的结果。

编辑2:考虑您的评论的另一个解决方案。

shouldStop <- FALSE
x <- foreach(k = 1:10, .combine="cbind", .errorhandling="stop") %do%
  {
    if( !shouldStop ){
      # put your time consuming code here
      cat("Element ", k, "\n")
      Sys.sleep(0.5)
      shouldStop <- shouldStop ||  is.element(k, 2:6)
      k
    }
  }
Run Code Online (Sandbox Code Playgroud)

使用此解决方案,在停止条件成立时正在运行的进程仍会计算结束,但可以避免所有即将到来的进程消耗时间。