Cor*_*nes 19 parallel-processing foreach r break
我使用R包foreach()
用%dopar%
做并行长(〜天)计算.我希望能够在其中一个产生错误的情况下停止整个计算集.但是,我还没有找到实现这一目标的方法,从文档和各种论坛我发现没有迹象表明这是可能的.特别是,break()
不起作用,stop()
只停止当前计算,而不是整个foreach
循环.
请注意,我不能使用简单的for循环,因为最终我想使用doRNG包并行化它.
下面是我在尝试的一个简化的,可重复的版本(串行这里显示%do%
,不过我用的时候有同样的问题doRNG
和%dopar%
).请注意,实际上我想并行运行此循环的所有元素(此处为10).
library(foreach)
myfunc <- function() {
x <- foreach(k = 1:10, .combine="cbind", .errorhandling="stop") %do% {
cat("Element ", k, "\n")
Sys.sleep(0.5) # just to show that stop does not cause exit from foreach
if(is.element(k, 2:6)) {
cat("Should stop\n")
stop("Has stopped")
}
k
}
return(x)
}
x <- myfunc()
# stop() halts the processing of k=2:6, but it does not stop the foreach loop itself.
# x is not returned. The execution produces the error message
# Error in { : task 2 failed - "Has stopped"
Run Code Online (Sandbox Code Playgroud)
我想要实现的是整个foreach循环可以在某些条件下立即退出(这里stop()
遇到时).
我发现无法实现这一目标foreach
.似乎我需要一种方法向所有其他进程发送消息以使它们也停止.
如果不可能foreach
,有没有人知道替代品?我也试图用这个来实现这个目标parallel::mclapply
,但这也不起作用.
> sessionInfo()
R version 3.0.0 (2013-04-03)
Platform: x86_64-apple-darwin10.8.0 (64-bit)
locale:
[1] C/UTF-8/C/C/C/C
attached base packages:
[1] stats graphics grDevices utils datasets methods base
other attached packages:
[1] foreach_1.4.0
loaded via a namespace (and not attached):
[1] codetools_0.2-8 compiler_3.0.0 iterators_1.0.6
Run Code Online (Sandbox Code Playgroud)
Ste*_*ton 13
听起来你想要一个不耐烦的"停止"错误处理版本.您可以通过编写自定义组合函数来实现它,并foreach
在每个结果返回后立即调用它.要做到这一点,你需要:
combine
即时呼叫的后端,如doMPI
或doRedis
.multicombine
.inorder
为FALSE
.init
为某物(如NULL
)这是一个例子:
library(foreach)
parfun <- function(errval, n) {
abortable <- function(errfun) {
comb <- function(x, y) {
if (inherits(y, 'error')) {
warning('This will leave your parallel backend in an inconsistent state')
errfun(y)
}
c(x, y)
}
foreach(i=seq_len(n), .errorhandling='pass', .export='errval',
.combine='comb', .inorder=FALSE, .init=NULL) %dopar% {
if (i == errval)
stop('testing abort')
Sys.sleep(10)
i
}
}
callCC(abortable)
}
Run Code Online (Sandbox Code Playgroud)
请注意,我还将错误处理设置为"pass",因此foreach
将使用错误对象调用combine函数.无论内部和后端使用的错误处理如何,该callCC
函数都用于从foreach
循环返回foreach
.在这种情况下,callCC
将调用该abortable
函数,向其传递一个强制callCC
立即返回的函数对象.通过从combine函数调用该函数,我们可以foreach
在检测到错误对象时从循环中转义,并callCC
返回该对象.有关?callCC
更多信息,请参阅
实际上,您可以在parfun
没有注册并行后端的情况下使用,并在foreach
执行抛出错误的任务时验证循环是否"中断",但这可能需要一段时间,因为任务是按顺序执行的.例如,如果没有注册后端,则执行此操作需要20秒:
print(system.time(parfun(3, 4)))
Run Code Online (Sandbox Code Playgroud)
当parfun
并行执行时,我们需要做的不仅仅是打破foreach
循环:我们还需要停止工作,否则他们将继续计算他们分配的任务.有了doMPI
,工人可以停止使用mpi.abort
:
library(doMPI)
cl <- startMPIcluster()
registerDoMPI(cl)
r <- parfun(getDoParWorkers(), getDoParWorkers())
if (inherits(r, 'error')) {
cat(sprintf('Caught error: %s\n', conditionMessage(r)))
mpi.abort(cl$comm)
}
Run Code Online (Sandbox Code Playgroud)
请注意,在循环中止后无法使用集群对象,因为事情没有被正确清理,这就是正常的"停止"错误处理不能以这种方式工作的原因.
这不是对您问题的直接答案,但when()
如果满足条件,使用您可以避免进入循环:
x <- foreach(k = 1:10, .combine="cbind", .errorhandling="stop") %:%
when( !is.element(k, 2:6) ) %do%
{
cat("Element ", k, "\n")
Sys.sleep(0.5)
k
}
Run Code Online (Sandbox Code Playgroud)
编辑:
我忘记了一些事情:我认为这是设计使然,你不能只是停止 foreach 循环。如果并行运行循环,则每一轮都是独立处理的,这意味着当您停止整个循环时,k=2
无法预测进程是否已k=1
终止或仍在运行。因此,使用when()
条件可以得到确定的结果。
编辑2:考虑您的评论的另一个解决方案。
shouldStop <- FALSE
x <- foreach(k = 1:10, .combine="cbind", .errorhandling="stop") %do%
{
if( !shouldStop ){
# put your time consuming code here
cat("Element ", k, "\n")
Sys.sleep(0.5)
shouldStop <- shouldStop || is.element(k, 2:6)
k
}
}
Run Code Online (Sandbox Code Playgroud)
使用此解决方案,在停止条件成立时正在运行的进程仍会计算结束,但可以避免所有即将到来的进程消耗时间。