最近,我一直在使用future(andfuture.apply和furrr) 在 R 中进行一些并行处理,这大部分都很棒,但我偶然发现了一些我无法解释的东西。这可能是某个地方的错误,但也可能是我编码的草率。如果有人能解释这种行为,我们将不胜感激。
我正在对数据的不同子组运行模拟。对于每个组,我想运行模拟n时间,然后计算结果的一些汇总统计数据。以下是一些示例代码,用于重现我的基本设置并演示我所看到的问题:
library(tidyverse)
library(future)
library(future.apply)
# Helper functions
#' Calls out to `free` to get total system memory used
sys_used <- function() {
.f <- system2("free", "-b", stdout = TRUE)
as.numeric(unlist(strsplit(.f[2], " +"))[3])
}
#' Write time, and memory usage to log file in CSV format
#' @param .f the file to write to
#' @param .id identifier for the row to be written
mem_string <- function(.f, .id) {
.s <- paste(.id, Sys.time(), sys_used(), Sys.getpid(), sep = ",")
write_lines(.s, .f, append = TRUE)
}
# Inputs
fake_inputs <- 1:16
nsim <- 100
nrows <- 1e6
log_file <- "future_mem_leak_log.csv"
if (fs::file_exists(log_file)) fs::file_delete(log_file)
test_cases <- list(
list(
name = "multisession-sequential",
plan = list(multisession, sequential)
),
list(
name = "sequential-multisession",
plan = list(sequential, multisession)
)
)
# Test code
for (.t in test_cases) {
plan(.t$plan)
# loop over subsets of the data
final_out <- future_lapply(fake_inputs, function(.i) {
# loop over simulations
out <- future_lapply(1:nsim, function(.j) {
# in real life this would be doing simulations,
# but here we just create "results" using rnorm()
res <- data.frame(
id = rep(.j, nrows),
col1 = rnorm(nrows) * .i,
col2 = rnorm(nrows) * .i,
col3 = rnorm(nrows) * .i,
col4 = rnorm(nrows) * .i,
col5 = rnorm(nrows) * .i,
col6 = rnorm(nrows) * .i
)
# write memory usage to file
mem_string(log_file, .t$name)
# in real life I would write res to file to read in later, but here we
# only return head of df so we know the returned value isn't filling up memory
res %>% slice_head(n = 10)
})
})
# clean up any leftover objects before testing the next plan
try(rm(final_out))
try(rm(out))
try(rm(res))
}
Run Code Online (Sandbox Code Playgroud)
外循环用于测试两种并行化策略:是对数据子集进行并行化,还是对 100 个模拟进行并行化。
plan(multicore)在这里会更好(尽管我确定是否会),但我更感兴趣的是弄清楚发生了什么plan(multisession)我在 8-vCPU Linux EC2 上运行了此程序(如果人们需要,我可以提供更多规格),并根据结果创建了以下绘图(在底部绘制代码以实现可重复性):
首先,plan(list(multisession, sequential))速度更快(如预期,请参阅上面的警告),但我感到困惑的是内存配置文件。总的系统内存使用量保持相当恒定plan(list(multisession, sequential)),正如我所期望的那样,因为我假设res每次循环时都会覆盖该对象。
然而,随着程序运行,内存使用量稳步增长。plan(list(sequential, multisession))似乎每次通过循环res都会创建对象,然后在某个地方徘徊,占用内存。在我的真实示例中,它足够大,以至于填满了我的整个(32GB)系统内存,并在大约一半时终止了该进程。
这是真正让我困惑的部分!当我将外部更改future_lapply为常规lapply并设置时plan(multisession),我看不到它!从我对“未来:拓扑”小插图的阅读来看,这应该是相同的plan(list(sequential, multisession)),但该图根本没有显示内存增长(事实上,它与plan(list(multisession, sequential))上面的图中几乎相同)
实际上,我最初是用 发现的,furrr::future_map_dfr()但为了确定这不是 中的错误furrr,我尝试了future.apply::future_lapply()并得到了显示的结果。我尝试用 just 对其进行编码future::future(),但得到了非常不同的结果,但很可能是因为我编码的内容实际上并不等效。furrr在没有或提供的抽象层的情况下,我没有太多直接使用 futures 的经验future.apply。
同样,我们非常感谢对此的任何见解。
library(tidyverse)
logDat <- read_csv("future_mem_leak_log.csv",
col_names = c("plan", "time", "sys_used", "pid")) %>%
group_by(plan) %>%
mutate(
start = min(time),
time_elapsed = as.numeric(difftime(time, start, units = "secs"))
)
ggplot(logDat, aes(x = time_elapsed/60, y = sys_used/1e9, group = plan, colour = plan)) +
geom_line() +
xlab("Time elapsed (in mins)") + ylab("Memory used (in GB)") +
ggtitle("Memory Usage\n list(multisession, sequential) vs list(sequential, multisession)")
Run Code Online (Sandbox Code Playgroud)