Mai*_*ura 40 r progress-bar mclapply
我爱的设置.progress = 'text'在plyr's llply.但是,由于列表项被发送到各个核心,然后在最后进行整理,因此我不得不知道沿着mclapply(从包中multicore)走多远.
我一直在输出消息,*currently in sim_id # ....*但这并不是很有帮助,因为它没有给我一个指示列表项的完成百分比的指示(虽然知道我的脚本没有卡住并移动是有帮助的).
有人可以建议其他想法,让我看看我的.Rout文件,并获得进步感吗?我已经考虑过添加一个手动计数器但是看不清楚我将如何实现它,因为mclapply必须先完成所有列表项的处理才能给出任何反馈.
fot*_*ton 26
由于mclapply产生多个进程的事实,人们可能想要使用fifos,管道甚至套接字.现在考虑以下示例:
library(multicore)
finalResult <- local({
f <- fifo(tempfile(), open="w+b", blocking=T)
if (inherits(fork(), "masterProcess")) {
# Child
progress <- 0.0
while (progress < 1 && !isIncomplete(f)) {
msg <- readBin(f, "double")
progress <- progress + as.numeric(msg)
cat(sprintf("Progress: %.2f%%\n", progress * 100))
}
exit()
}
numJobs <- 100
result <- mclapply(1:numJobs, function(...) {
# Dome something fancy here
# ...
# Send some progress update
writeBin(1/numJobs, f)
# Some arbitrary result
sample(1000, 1)
})
close(f)
result
})
cat("Done\n")
Run Code Online (Sandbox Code Playgroud)
这里,临时文件用作fifo,主进程分叉一个孩子,其唯一职责是报告当前进度.主进程继续调用mclapply,其中要评估的表达式(更准确地说,表达式块)通过以下方式将部分进度信息写入fifo缓冲区writeBin.
由于这只是一个简单的例子,您可能需要根据需要调整整个输出内容.HTH!
waf*_*hin 13
基本上添加了@ fotNelson解决方案的另一个版本,但有一些修改:
parallel而不是multicore现在已从CRAN中删除希望这有助于某人......
library(parallel)
#-------------------------------------------------------------------------------
#' Wrapper around mclapply to track progress
#'
#' Based on http://stackoverflow.com/questions/10984556
#'
#' @param X a vector (atomic or list) or an expressions vector. Other
#' objects (including classed objects) will be coerced by
#' ‘as.list’
#' @param FUN the function to be applied to
#' @param ... optional arguments to ‘FUN’
#' @param mc.preschedule see mclapply
#' @param mc.set.seed see mclapply
#' @param mc.silent see mclapply
#' @param mc.cores see mclapply
#' @param mc.cleanup see mclapply
#' @param mc.allow.recursive see mclapply
#' @param mc.progress track progress?
#' @param mc.style style of progress bar (see txtProgressBar)
#'
#' @examples
#' x <- mclapply2(1:1000, function(i, y) Sys.sleep(0.01))
#' x <- mclapply2(1:3, function(i, y) Sys.sleep(1), mc.cores=1)
#'
#' dat <- lapply(1:10, function(x) rnorm(100))
#' func <- function(x, arg1) mean(x)/arg1
#' mclapply2(dat, func, arg1=10, mc.cores=2)
#-------------------------------------------------------------------------------
mclapply2 <- function(X, FUN, ...,
mc.preschedule = TRUE, mc.set.seed = TRUE,
mc.silent = FALSE, mc.cores = getOption("mc.cores", 2L),
mc.cleanup = TRUE, mc.allow.recursive = TRUE,
mc.progress=TRUE, mc.style=3)
{
if (!is.vector(X) || is.object(X)) X <- as.list(X)
if (mc.progress) {
f <- fifo(tempfile(), open="w+b", blocking=T)
p <- parallel:::mcfork()
pb <- txtProgressBar(0, length(X), style=mc.style)
setTxtProgressBar(pb, 0)
progress <- 0
if (inherits(p, "masterProcess")) {
while (progress < length(X)) {
readBin(f, "double")
progress <- progress + 1
setTxtProgressBar(pb, progress)
}
cat("\n")
parallel:::mcexit()
}
}
tryCatch({
result <- mclapply(X, ..., function(...) {
res <- FUN(...)
if (mc.progress) writeBin(1, f)
res
},
mc.preschedule = mc.preschedule, mc.set.seed = mc.set.seed,
mc.silent = mc.silent, mc.cores = mc.cores,
mc.cleanup = mc.cleanup, mc.allow.recursive = mc.allow.recursive
)
}, finally = {
if (mc.progress) close(f)
})
result
}
Run Code Online (Sandbox Code Playgroud)
该pbapply软件包已针对一般情况实现了此功能.双方pblapply并pbsapply有一个cl说法.从文档:
可以通过
cl参数启用并行处理.parLapply当cl是一个'cluster'对象时mclapply被调用,当cl是一个整数时被调用.与没有进度条的函数的并行等价物相比,显示进度条增加了主进程和节点/子进程之间的通信开销.当禁用进度条(即getOption("pboptions")$type == "none"dopb(),FALSE)时,功能将回退到其原始等效项.这是interactive()if 的默认值FALSE(即从命令行R脚本调用).
如果没有提供cl(或通过NULL),lapply则使用默认的非并行,也包括进度条.
这是一个基于@ fotNelton解决方案的函数,适用于通常使用mcapply的地方.
mcadply <- function(X, FUN, ...) {
# Runs multicore lapply with progress indicator and transformation to
# data.table output. Arguments mirror those passed to lapply.
#
# Args:
# X: Vector.
# FUN: Function to apply to each value of X. Note this is transformed to
# a data.frame return if necessary.
# ...: Other arguments passed to mclapply.
#
# Returns:
# data.table stack of each mclapply return value
#
# Progress bar code based on https://stackoverflow.com/a/10993589
require(multicore)
require(plyr)
require(data.table)
local({
f <- fifo(tempfile(), open="w+b", blocking=T)
if (inherits(fork(), "masterProcess")) {
# Child
progress <- 0
print.progress <- 0
while (progress < 1 && !isIncomplete(f)) {
msg <- readBin(f, "double")
progress <- progress + as.numeric(msg)
# Print every 1%
if(progress >= print.progress + 0.01) {
cat(sprintf("Progress: %.0f%%\n", progress * 100))
print.progress <- floor(progress * 100) / 100
}
}
exit()
}
newFun <- function(...) {
writeBin(1 / length(X), f)
return(as.data.frame(FUN(...)))
}
result <- as.data.table(rbind.fill(mclapply(X, newFun, ...)))
close(f)
cat("Done\n")
return(result)
})
}
Run Code Online (Sandbox Code Playgroud)
您可以使用系统 echo 函数从工作线程中进行写入,因此只需将以下行添加到您的函数中:
myfun <- function(x){
if(x %% 5 == 0) system(paste("echo 'now processing:",x,"'"))
dosomething(mydata[x])
}
result <- mclapply(1:10,myfun,mc.cores=5)
> now processing: 5
> now processing: 10
Run Code Online (Sandbox Code Playgroud)
例如,如果您传递索引,那么这将起作用,因此不是传递数据列表,而是传递索引并在工作函数中提取数据。