在foreach中构建.combine函数

llr*_*lrs 5 parallel-processing foreach r

我有一个我想要并行执行的进程,但是由于一些奇怪的错误我失败了.现在我正在考虑组合,并计算主CPU上的失败任务.但是我不知道如何为.combine编写这样的函数.

怎么写?

我知道如何编写它们,例如这个答案提供了一个例子,但它没有提供如何处理失败的任务,也没有重复在主服务器上重复任务.

我会做的事情如下:

foreach(i=1:100, .combine = function(x, y){tryCatch(?)} %dopar% {
    long_process_which_fails_randomly(i)
}
Run Code Online (Sandbox Code Playgroud)

但是,如何在.combine函数中使用该任务的输入(如果可以的话)?或者我应该在内部提供%dopar%返回标志或列表来计算它?

Ste*_*ton 2

要在组合函数中执行任务,您需要在 foreach 循环体返回的结果对象中包含额外的信息。在这种情况下,这将是一个错误标志和 的值i。有很多方法可以做到这一点,但这里有一个例子:

comb <- function(results, x) {
  i <- x$i
  result <- x$result
  if (x$error) {
    cat(sprintf('master computing failed task %d\n', i))
    # Could call function repeatedly until it succeeds,
    # but that could hang the master
    result <- try(fails_randomly(i))
  }
  results[i] <- list(result)  # guard against a NULL result
  results
}

r <- foreach(i=1:100, .combine='comb',
             .init=vector('list', 100)) %dopar% {
  tryCatch({
    list(error=FALSE, i=i, result=fails_randomly(i))
  },
  error=function(e) {
    list(error=TRUE, i=i, result=e)
  })
}
Run Code Online (Sandbox Code Playgroud)

我很想通过重复执行并行循环直到计算完所有任务来处理这个问题:

x <- rnorm(100)
results <- lapply(x, function(i) simpleError(''))

# Might want to put a limit on the number of retries
repeat {
  ix <- which(sapply(results, function(x) inherits(x, 'error')))
  if (length(ix) == 0)
    break

  cat(sprintf('computing tasks %s\n', paste(ix, collapse=',')))
  r <- foreach(i=x[ix], .errorhandling='pass') %dopar% {
    fails_randomly(i)
  }

  results[ix] <- r
}
Run Code Online (Sandbox Code Playgroud)

请注意,此解决方案使用的.errorhandling选项在可能发生错误时非常有用。有关此选项的更多信息,请参见 foreach 手册页。