Cor*_*nes 19 parallel-processing foreach r break
我使用R包foreach()用%dopar%做并行长(〜天)计算.我希望能够在其中一个产生错误的情况下停止整个计算集.但是,我还没有找到实现这一目标的方法,从文档和各种论坛我发现没有迹象表明这是可能的.特别是,break()不起作用,stop()只停止当前计算,而不是整个foreach循环.
请注意,我不能使用简单的for循环,因为最终我想使用doRNG包并行化它.
下面是我在尝试的一个简化的,可重复的版本(串行这里显示%do%,不过我用的时候有同样的问题doRNG和%dopar%).请注意,实际上我想并行运行此循环的所有元素(此处为10).
library(foreach)
myfunc <- function() {
x <- foreach(k = 1:10, .combine="cbind", .errorhandling="stop") %do% {
cat("Element ", k, "\n")
Sys.sleep(0.5) # just to show that stop does not cause exit from foreach
if(is.element(k, 2:6)) {
cat("Should stop\n")
stop("Has stopped")
}
k
}
return(x)
}
x <- myfunc()
# stop() halts the processing of k=2:6, but it does not stop the foreach loop itself.
# x is not returned. The execution produces the error message
# Error in { : task 2 failed - "Has stopped"
Run Code Online (Sandbox Code Playgroud)
我想要实现的是整个foreach循环可以在某些条件下立即退出(这里stop()遇到时).
我发现无法实现这一目标foreach.似乎我需要一种方法向所有其他进程发送消息以使它们也停止.
如果不可能foreach,有没有人知道替代品?我也试图用这个来实现这个目标parallel::mclapply,但这也不起作用.
> sessionInfo()
R version 3.0.0 (2013-04-03)
Platform: x86_64-apple-darwin10.8.0 (64-bit)
locale:
[1] C/UTF-8/C/C/C/C
attached base packages:
[1] stats graphics grDevices utils datasets methods base
other attached packages:
[1] foreach_1.4.0
loaded via a namespace (and not attached):
[1] codetools_0.2-8 compiler_3.0.0 iterators_1.0.6
Run Code Online (Sandbox Code Playgroud)
Ste*_*ton 13
听起来你想要一个不耐烦的"停止"错误处理版本.您可以通过编写自定义组合函数来实现它,并foreach在每个结果返回后立即调用它.要做到这一点,你需要:
combine即时呼叫的后端,如doMPI或doRedis.multicombine.inorder为FALSE.init为某物(如NULL)这是一个例子:
library(foreach)
parfun <- function(errval, n) {
abortable <- function(errfun) {
comb <- function(x, y) {
if (inherits(y, 'error')) {
warning('This will leave your parallel backend in an inconsistent state')
errfun(y)
}
c(x, y)
}
foreach(i=seq_len(n), .errorhandling='pass', .export='errval',
.combine='comb', .inorder=FALSE, .init=NULL) %dopar% {
if (i == errval)
stop('testing abort')
Sys.sleep(10)
i
}
}
callCC(abortable)
}
Run Code Online (Sandbox Code Playgroud)
请注意,我还将错误处理设置为"pass",因此foreach将使用错误对象调用combine函数.无论内部和后端使用的错误处理如何,该callCC函数都用于从foreach循环返回foreach.在这种情况下,callCC将调用该abortable函数,向其传递一个强制callCC立即返回的函数对象.通过从combine函数调用该函数,我们可以foreach在检测到错误对象时从循环中转义,并callCC返回该对象.有关?callCC更多信息,请参阅
实际上,您可以在parfun没有注册并行后端的情况下使用,并在foreach执行抛出错误的任务时验证循环是否"中断",但这可能需要一段时间,因为任务是按顺序执行的.例如,如果没有注册后端,则执行此操作需要20秒:
print(system.time(parfun(3, 4)))
Run Code Online (Sandbox Code Playgroud)
当parfun并行执行时,我们需要做的不仅仅是打破foreach循环:我们还需要停止工作,否则他们将继续计算他们分配的任务.有了doMPI,工人可以停止使用mpi.abort:
library(doMPI)
cl <- startMPIcluster()
registerDoMPI(cl)
r <- parfun(getDoParWorkers(), getDoParWorkers())
if (inherits(r, 'error')) {
cat(sprintf('Caught error: %s\n', conditionMessage(r)))
mpi.abort(cl$comm)
}
Run Code Online (Sandbox Code Playgroud)
请注意,在循环中止后无法使用集群对象,因为事情没有被正确清理,这就是正常的"停止"错误处理不能以这种方式工作的原因.
这不是对您问题的直接答案,但when()如果满足条件,使用您可以避免进入循环:
x <- foreach(k = 1:10, .combine="cbind", .errorhandling="stop") %:%
when( !is.element(k, 2:6) ) %do%
{
cat("Element ", k, "\n")
Sys.sleep(0.5)
k
}
Run Code Online (Sandbox Code Playgroud)
编辑:
我忘记了一些事情:我认为这是设计使然,你不能只是停止 foreach 循环。如果并行运行循环,则每一轮都是独立处理的,这意味着当您停止整个循环时,k=2无法预测进程是否已k=1终止或仍在运行。因此,使用when()条件可以得到确定的结果。
编辑2:考虑您的评论的另一个解决方案。
shouldStop <- FALSE
x <- foreach(k = 1:10, .combine="cbind", .errorhandling="stop") %do%
{
if( !shouldStop ){
# put your time consuming code here
cat("Element ", k, "\n")
Sys.sleep(0.5)
shouldStop <- shouldStop || is.element(k, 2:6)
k
}
}
Run Code Online (Sandbox Code Playgroud)
使用此解决方案,在停止条件成立时正在运行的进程仍会计算结束,但可以避免所有即将到来的进程消耗时间。