89_*_*ple 4 foreach r doparallel
我有一个包含 4500 行的输入 csv 文件。每一行都有一个唯一的 ID,对于每一行,我必须读取一些数据,进行一些计算,并将输出写入一个 csv 文件,以便在我的输出目录中写入 4500 个 csv 文件。一个单独的输出 csv 文件包含一行 8 列的数据由于我必须对输入 csv 的每一行执行相同的计算,我想我可以使用foreach. 以下是逻辑的整体结构
library(doSNOW)
library(foreach)
library(data.table)
input_csv <- fread('inputFile.csv'))
# to track the progres of the loop
iterations <- nrow(input_csv)
pb <- txtProgressBar(max = iterations, style = 3)
progress <- function(n) setTxtProgressBar(pb, n)
opts <- list(progress = progress)
myClusters <- makeCluster(6)
registerDoSNOW(myClusters)
results <-
foreach(i = 1:nrow(input_csv),
.packages = c("myCustomPkg","dplyr","arrow","zoo","data.table","rlist","stringr"),
.errorhandling = 'remove',
.options.snow = opts) %dopar%
{
rowRef <- input_csv[i, ]
# read data for the unique location in `rowRef`
weather.path <- arrow(paste0(rowRef$locationID'_weather.parquet')))
# do some calculations
# save the results as csv
fwrite(temp_result, file.path(paste0('output_iter_',i,'.csv')))
return(temp_result)
}
Run Code Online (Sandbox Code Playgroud)
上面的代码工作正常,但在完成input_csv. 我一直在查看我的输出目录,在 N% 的迭代之后,没有文件被写入。我怀疑 foreach 循环是否进入某种睡眠模式?我发现更令人困惑的是,如果我终止工作,重新运行上面的代码,它确实说 16% 或 30%,然后再次变为非活动状态,即每次新运行时,它会以不同的进度水平“休眠”。
在这种情况下,我无法弄清楚如何给出一个最小的可重复示例,但我想如果有人知道我应该检查的任何清单或导致这种情况的潜在问题,那将非常有帮助。谢谢
编辑我仍在努力解决这个问题。如果我能提供更多信息,请告诉我。
EDIT2
我的原件inputFile包含 213164 行。所以我把我的大文件分成 46 个小文件,这样每个文件有 4634 行
library(foreach)
library(data.table)
library(doParallel)
myLs <- split(mydat, (as.numeric(rownames(mydat))-1) %/% 46))
Run Code Online (Sandbox Code Playgroud)
然后我这样做了:
for(pr in 1:46){
input_csv <- myLs[[pr]]
myClusters <- parallel::makeCluster(6)
doParallel::registerDoParallel(myClusters)
results <-
foreach(i = 1:nrow(input_csv),
.packages = c("myCustomPkg","dplyr","arrow","zoo","data.table","rlist","stringr"),
.errorhandling = 'remove',
.verbose = TRUE) %dopar%
{
rowRef <- input_csv[i, ]
# read data for the unique location in `rowRef`
weather.path <- arrow(paste0(rowRef$locationID'_weather.parquet')))
# do some calculations
# save the results as csv
fwrite(temp_result, file.path(paste0('output_iter_',i,'_',pr,'.csv')))
gc()
}
parallel::stopCluster(myClusters)
gc()
}
Run Code Online (Sandbox Code Playgroud)
这也有效,直到说 pr = 7 或 pr = 8 迭代,然后不会继续,也不会生成任何错误消息。我感到很困惑。
编辑 这是我的 CPU 使用情况。我只使用了 4 个内核来生成这个图像。任何人都可以解释这张图片中是否有任何内容可以解决我的问题。
您可以使用progressr包以交互方式跟踪内存使用情况。
例如使用furrr包:
library(furrr)
library(pryr)
plan(multisession,workers=6)
library(progressr)
handlers("progress")
#input_csv <- fread('inputFile.csv'))
#filesID <- as.list(1:nrow(input_csv))
filesID <- as.list(1:12)
with_progress({
p <- progressor(along = filesID)
result <- future_map(filesID, function(fileID) {
#rowRef <- input_csv[fileID, ]
# read data for the unique location in `rowRef`
#weather.path <- arrow(paste0(rowRef$locationID'_weather.parquet')))
# do some calculations : simulate memory increase
temp_result <- rnorm(2e7)
# save the results as csv
#fwrite(temp_result, file.path(paste0('output_iter_',fileID,'.csv')))
Sys.sleep(2)
p(sprintf("memory used=%g", pryr::mem_used()))
return(object.size(temp_result))
},.options=future_options(packages=c("myCustomPkg","dplyr","arrow","zoo","data.table","rlist","stringr")))
})
[====================================================>-------] 90% memory used=6.75075e+08
Run Code Online (Sandbox Code Playgroud)
同样的方法适用于 foreach。
另一个建议是不要将结果返回到主进程,因为您已经将它们存储在文件中。而不是return(temp_result)您可以输出摘要,例如object.size知道可以在关联的文件中找到完整的结果。