有没有办法跟踪mclapply的进度?

Mai*_*ura 40 r progress-bar mclapply

我爱的设置.progress = 'text'plyr's llply.但是,由于列表项被发送到各个核心,然后在最后进行整理,因此我不得不知道沿着mclapply(从包中multicore)走多远.

我一直在输出消息,*currently in sim_id # ....*但这并不是很有帮助,因为它没有给我一个指示列表项的完成百分比的指示(虽然知道我的脚本没有卡住并移动是有帮助的).

有人可以建议其他想法,让我看看我的.Rout文件,并获得进步感吗?我已经考虑过添加一个手动计数器但是看不清楚我将如何实现它,因为mclapply必须先完成所有列表项的处理才能给出任何反馈.

fot*_*ton 26

由于mclapply产生多个进程的事实,人们可能想要使用fifos,管道甚至套接字.现在考虑以下示例:

library(multicore)

finalResult <- local({
    f <- fifo(tempfile(), open="w+b", blocking=T)
    if (inherits(fork(), "masterProcess")) {
        # Child
        progress <- 0.0
        while (progress < 1 && !isIncomplete(f)) {
            msg <- readBin(f, "double")
            progress <- progress + as.numeric(msg)
            cat(sprintf("Progress: %.2f%%\n", progress * 100))
        } 
        exit()
    }
    numJobs <- 100
    result <- mclapply(1:numJobs, function(...) {
        # Dome something fancy here
        # ...
        # Send some progress update
        writeBin(1/numJobs, f)
        # Some arbitrary result
        sample(1000, 1)
    })
    close(f)
    result
})

cat("Done\n")
Run Code Online (Sandbox Code Playgroud)

这里,临时文件用作fifo,主进程分叉一个孩子,其唯一职责是报告当前进度.主进程继续调用mclapply,其中要评估的表达式(更准确地说,表达式块)通过以下方式将部分进度信息写入fifo缓冲区writeBin.

由于这只是一个简单的例子,您可能需要根据需要调整整个输出内容.HTH!

  • 在`mclapply`的情况下,主进程正在等待所有子进程完成,因此在不执行另一个子进程的情况下,当'mclapply`仍在工作时,无法接收和处理消息. (2认同)

waf*_*hin 13

基本上添加了@ fotNelson解决方案的另一个版本,但有一些修改:

  • 替换mclapply(支持所有mclapply函数)
  • 捕获ctrl-c调用并正常中止
  • 使用内置进度条(txtProgressBar)
  • 用于跟踪进度的选项,并使用指定的进度条样式
  • 使用parallel而不是multicore现在已从CRAN中删除
  • 强制X按照mclapply列出(因此长度(X)给出预期结果)
  • 顶部的roxygen2样式文档

希望这有助于某人......

library(parallel)

#-------------------------------------------------------------------------------
#' Wrapper around mclapply to track progress
#' 
#' Based on http://stackoverflow.com/questions/10984556
#' 
#' @param X         a vector (atomic or list) or an expressions vector. Other
#'                  objects (including classed objects) will be coerced by
#'                  ‘as.list’
#' @param FUN       the function to be applied to
#' @param ...       optional arguments to ‘FUN’
#' @param mc.preschedule see mclapply
#' @param mc.set.seed see mclapply
#' @param mc.silent see mclapply
#' @param mc.cores see mclapply
#' @param mc.cleanup see mclapply
#' @param mc.allow.recursive see mclapply
#' @param mc.progress track progress?
#' @param mc.style    style of progress bar (see txtProgressBar)
#'
#' @examples
#' x <- mclapply2(1:1000, function(i, y) Sys.sleep(0.01))
#' x <- mclapply2(1:3, function(i, y) Sys.sleep(1), mc.cores=1)
#' 
#' dat <- lapply(1:10, function(x) rnorm(100)) 
#' func <- function(x, arg1) mean(x)/arg1 
#' mclapply2(dat, func, arg1=10, mc.cores=2)
#-------------------------------------------------------------------------------
mclapply2 <- function(X, FUN, ..., 
    mc.preschedule = TRUE, mc.set.seed = TRUE,
    mc.silent = FALSE, mc.cores = getOption("mc.cores", 2L),
    mc.cleanup = TRUE, mc.allow.recursive = TRUE,
    mc.progress=TRUE, mc.style=3) 
{
    if (!is.vector(X) || is.object(X)) X <- as.list(X)

    if (mc.progress) {
        f <- fifo(tempfile(), open="w+b", blocking=T)
        p <- parallel:::mcfork()
        pb <- txtProgressBar(0, length(X), style=mc.style)
        setTxtProgressBar(pb, 0) 
        progress <- 0
        if (inherits(p, "masterProcess")) {
            while (progress < length(X)) {
                readBin(f, "double")
                progress <- progress + 1
                setTxtProgressBar(pb, progress) 
            }
            cat("\n")
            parallel:::mcexit()
        }
    }
    tryCatch({
        result <- mclapply(X, ..., function(...) {
                res <- FUN(...)
                if (mc.progress) writeBin(1, f)
                res
            }, 
            mc.preschedule = mc.preschedule, mc.set.seed = mc.set.seed,
            mc.silent = mc.silent, mc.cores = mc.cores,
            mc.cleanup = mc.cleanup, mc.allow.recursive = mc.allow.recursive
        )

    }, finally = {
        if (mc.progress) close(f)
    })
    result
}
Run Code Online (Sandbox Code Playgroud)

  • 这种跟踪进度的方法不适用于 Rstudio(请参阅此处的讨论:http://stackoverflow.com/questions/27314011/),因为在 Rstudio 中会忽略分叉进程(将进度打印到屏幕)的输出。 . (2认同)

Axe*_*man 9

pbapply软件包已针对一般情况实现了此功能.双方pblapplypbsapply有一个cl说法.从文档:

可以通过cl参数启用并行处理.parLapplycl是一个' cluster'对象时mclapply被调用,当cl是一个整数时被调用.与没有进度条的函数的并行等价物相比,显示进度条增加了主进程和节点/子进程之间的通信开销.当禁用进度条(即getOption("pboptions")$type == "none" dopb(),FALSE)时,功能将回退到其原始等效项.这是interactive() if 的默认值FALSE(即从命令行R脚本调用).

如果没有提供cl(或通过NULL),lapply则使用默认的非并行,也包括进度条.


Max*_*nis 7

这是一个基于@ fotNelton解决方案的函数,适用于通常使用mcapply的地方.

mcadply <- function(X, FUN, ...) {
  # Runs multicore lapply with progress indicator and transformation to
  # data.table output. Arguments mirror those passed to lapply.
  #
  # Args:
  # X:   Vector.
  # FUN: Function to apply to each value of X. Note this is transformed to 
  #      a data.frame return if necessary.
  # ...: Other arguments passed to mclapply.
  #
  # Returns:
  #   data.table stack of each mclapply return value
  #
  # Progress bar code based on https://stackoverflow.com/a/10993589
  require(multicore)
  require(plyr)
  require(data.table)

  local({
    f <- fifo(tempfile(), open="w+b", blocking=T)
    if (inherits(fork(), "masterProcess")) {
      # Child
      progress <- 0
      print.progress <- 0
      while (progress < 1 && !isIncomplete(f)) {
        msg <- readBin(f, "double")
        progress <- progress + as.numeric(msg)
        # Print every 1%
        if(progress >= print.progress + 0.01) {
          cat(sprintf("Progress: %.0f%%\n", progress * 100))
          print.progress <- floor(progress * 100) / 100
        }
      }
      exit()
    }

    newFun <- function(...) {
      writeBin(1 / length(X), f)
      return(as.data.frame(FUN(...)))
    }

    result <- as.data.table(rbind.fill(mclapply(X, newFun, ...)))
    close(f)
    cat("Done\n")
    return(result)
  })
}
Run Code Online (Sandbox Code Playgroud)


Nig*_*ter 6

您可以使用系统 echo 函数从工作线程中进行写入,因此只需将以下行添加到您的函数中:

myfun <- function(x){
if(x %% 5 == 0) system(paste("echo 'now processing:",x,"'"))
dosomething(mydata[x])
}

result <- mclapply(1:10,myfun,mc.cores=5)
> now processing: 5 
> now processing: 10 
Run Code Online (Sandbox Code Playgroud)

例如,如果您传递索引,那么这将起作用,因此不是传递数据列表,而是传递索引并在工作函数中提取数据。