有没有一种配置方法furrr::future_map
可以允许嵌套用例?考虑以下代码:
library(furrr)
library(tictoc)
# The problem is easier to reason about if you take N
# smaller than your number of cores, and M big.
N = 2
M = 100
plan(sequential)
tic()
x = future_map(1:N, function(i){
furrr::future_map(1:M,function(j){
Sys.sleep(1/M)
return(1)
})
})
toc() # 2sec + overhead
plan(multiprocess)
tic()
x = future_map(1:N, function(i){
furrr::future_map(1:M,function(j){
Sys.sleep(1/M)
return(1)
})
})
toc() # one sec + overhead !!
Run Code Online (Sandbox Code Playgroud)
第一个应该需要 2 秒多一点的时间。还行吧。但是,即使在千核机器上,有没有办法让第二个运行时间少于 1 秒?
我的用例如下:一些子任务比其他子任务需要更长的时间才能完成,当一些子任务完成时,一些核心可以自由地进一步分派更长的任务。
但furrr默认情况下不会这样做,并且长期运行的任务最终只在一个核心上。该问题与上面代码中显示的问题相同:是否有一种方法可以在某些核心空闲的情况下让furrr重新分派内部任务?
这是不可能做到的,还是我错过了furrr/future 调用的参数?
我必须从 Web API (NCBI entrez) 检索大型数据集,该数据集将我每秒的请求数限制为一定数量,例如 10 个(示例代码将在没有 API 密钥的情况下将您限制为 3 个)。我使用 Furrr 的 future_* 函数来并行化请求以尽快获取它们,如下所示:
library(tidyverse)
library(rentrez)
library(furrr)
plan(multiprocess)
api_key <- "<api key>"
# this will return a crap-ton of results
srch <- entrez_search("nuccore", "Homo sapiens", use_history=T, api_key=api_key)
total <- srch$count
per_request <- 500 # get 500 records per parallel request
nrequest <- total %/% per_request + as.logical(total %% per_request)
result <- future_map(seq(nrequest),function(x) {
rstart <- (x - 1) * per_request
return(entrez_fetch(
"nuccore",
web_history = srch$web_history,
rettype="fasta",
retmode="xml",
retstart=rstart, …
Run Code Online (Sandbox Code Playgroud) 我正在开发一个闪亮的应用程序,我使用furrr::
和将耗时的计算拆分为多个会话future
。我想在每次(或每 n)次计算后更新进度条。我发现下面的示例会更新进度条,但只有在会话完全完成后才会更新。这意味着最后 6 次更新几乎同时发生。对于非 Shiny 版本有争论.progress = T
会导致我想要的更新(请参阅第二个示例)。
有没有类似的东西可以用于闪亮的进度条?如果更新仅取决于第一个工作人员,那就更好了,因为所有工作人员的计算几乎都是相同的。
感谢您提前提供任何帮助!
示例1:
library(shiny)
library(progressr)
app <- shinyApp(
ui = fluidPage(
plotOutput("plot")
),
server = function(input, output) {
output$plot <- renderPlot({
X <- 1:24
future::plan(future::multisession(workers = 6))
withProgressShiny(message = "Calculation in progress",
detail = "This may take a while ...", value = 0, {
p <- progressor(along = X)
y <- furrr::future_map(X, function(x) {
Sys.sleep(1)
p()
})
})
plot(cars)
})
}
)
Run Code Online (Sandbox Code Playgroud)
示例2:
X <- 1:24 …
Run Code Online (Sandbox Code Playgroud) 我目前正在开发一个包,假设它被称为 myPack。我有一个名为 myFunc1 的函数和另一个名为 myFunc2 的函数,它看起来像这样:
myFunc2 <- function(x, parallel = FALSE) {
if(parallel) future::plan(future::multiprocess)
values <- furrr::future_map(x, myFunc1)
values
}
Run Code Online (Sandbox Code Playgroud)
现在,如果我在不并行的情况下调用 myFunc2,它会起作用。但是,如果我使用 parallel = TRUE 调用它,则会出现以下错误:
Error: Unexpected result (of class ‘snow-try-error’ != ‘FutureResult’)
retrieved for MultisessionFuture future (label = ‘<none>’, expression =
‘{; do.call(function(...) {; ...future.f.env <- environment(...future.f);
if (!is.null(...future.f.env$`~`)) {; if
(is_bad_rlang_tilde(...future.f.env$`~`)) {; ...future.f.env$`~` <-
base::`~`; ...; }); }, args = future.call.arguments); }’): there is no
package called 'myPack'. This suggests that the communication with
MultisessionFuture worker …
Run Code Online (Sandbox Code Playgroud) 由于内存(和速度)问题,我希望在data.table内进行一些计算,而不是在data.table外进行。
以下代码有100.000行,但我正在处理4000万行。
library(tictoc)
library(data.table) # version 1.11.8
library(purrr)
library(furrr)
plan(multiprocess)
veryfing_function <- function(vec1, vec2){
vector <- as.vector(outer(vec1, vec2, paste0))
split(vector, ceiling(seq_along(vector)/length(vec1)))
}
dt <- data.table(letters = replicate(1e6, sample(letters[1:5], 3, TRUE), simplify = FALSE),
numbers = replicate(1e6, sample(letters[6:10], 3, TRUE), simplify = FALSE))
tic()
result1 <- future_map2(dt$letters, dt$numbers, veryfing_function)
toc()
tic()
result2 <- mapply(veryfing_function, dt$letters, dt$numbers, SIMPLIFY = FALSE)
toc()
tic()
dt[, result := future_map2(letters, numbers, veryfing_function)]
toc()
tic()
dt[, result2 := mapply(veryfing_function, letters, numbers, SIMPLIFY = FALSE)]
toc()
Run Code Online (Sandbox Code Playgroud)
所有变体的输出都是相同的,并且符合预期。基准是:
26秒72秒38秒105秒,所以我看不到使用data.table内的函数或使用mapply的优势。 …
我有一个tibble,其中包含一个列表列,其中包含向量。我想创建一个新列来说明每个向量的长度。由于这个数据集很大(3M 行),我想使用该包来减少一些处理时间furrr
。不过,看起来purrr
比 更快furrr
。怎么会?
为了演示这个问题,我首先模拟一些数据。不要费心去理解模拟部分的代码,因为它与问题无关。
数据模拟功能
library(stringi)
library(rrapply)
library(tibble)
simulate_data <- function(nrows) {
split_func <- function(x, n) {
unname(split(x, rep_len(1:n, length(x))))
}
randomly_subset_vec <- function(x) {
sample(x, sample(length(x), 1))
}
tibble::tibble(
col_a = rrapply(object = split_func(
x = setNames(1:(nrows * 5),
stringi::stri_rand_strings(nrows * 5,
2)),
n = nrows
),
f = randomly_subset_vec),
col_b = runif(nrows)
)
}
Run Code Online (Sandbox Code Playgroud)
模拟数据
set.seed(2021)
my_data <- simulate_data(3e6) # takes about 1 minute to run on my …
Run Code Online (Sandbox Code Playgroud) 我想创建一个函数,它接受一个函数并对 a 中的每一行应用一次,tibble
参数存储在 a 的相应命名列中tibble
我意识到这听起来有点奇怪,但我希望面向用户的函数/功能简单。
在大多数情况下,处理会花费很多时间,所以我真的更喜欢有进度条功能,这就是我发现很大麻烦的地方:
此代码有效(然后没有进度条):
library(tibble)
library(dplyr)
library(purrr)
library(furrr)
library(tidyr)
library(wrassp)
library(progressr)
xf <- function(x,trim,na.rm,ds="ded"){
return(x*trim*na.rm)
}
xf2 <- function(x,trim,na.rm,ds="ded"){
return(list("a"=x,"b"=trim))
}
xf3 <- function(x,trim,na.rm,ds="ded"){
return(data.frame("a"=x,"b"=trim))
}
mymap <- function(f,...){
plan(multisession)
exDF <- tribble(
~x, ~trim, ~na.rm, ~notarg, ~listOfFiles, ~toFile,
0.5, 0, TRUE, 11.2, "~/Desktop/a1.wav", FALSE,
0.4, 0.5, TRUE, 12, "~/Desktop/a1.wav", FALSE
)
dotArgs <- list(...)
dotArgsRT <- as_tibble_row(dotArgs)
dotArgsNames <- names(dotArgs)
allArgsNames <- formalArgs(f)
exDF %>%
select(-any_of(!!dotArgsNames)) %>%
bind_cols(dotArgsRT) %>%
select(any_of(allArgsNames)) %>%
rowwise() …
Run Code Online (Sandbox Code Playgroud) 到目前为止,我发现lapply
在 R 中使用并行的最简单方法是通过以下示例代码:
library(parallel)
library(pbapply)
cl <- makeCluster(10)
clusterExport(cl = cl, {...})
clusterEvalQ(cl = cl, {...})
results <- pblapply(1:100, FUN = function(x){rnorm(x)}, cl = cl)
Run Code Online (Sandbox Code Playgroud)
它有一个非常有用的功能,即为结果提供进度条,并且当不需要并行计算时,通过设置 可以很容易地重用相同的代码cl = NULL
。
然而,我注意到的一个问题是它pblapply
正在批量循环遍历列表。例如,如果一个工人在某项任务上停留了很长时间,那么剩下的工人将等待该任务完成,然后再开始一批新的工作。对于某些任务,这会为工作流程增加大量不必要的时间。
我的问题:
是否有任何类似的并行框架允许工作人员独立运行?进度条和重用代码的能力cl=NULL
将是一个很大的优势。
也许可以修改现有代码pbapply
来添加此选项/功能?
dataframe
给定一个iris
默认值,如何配置purrr::map_dfr()
函数在每一行上运行dataframe
并执行函数foo
。
这是我的 df 的一行,请考虑到该值始终是一个大 JSON:
structure(list(Key = "2019/01/04/14/kuku@pupu.com_2ed026cb-8e9f-4392-9cc4-9f580b9d3aab_1345a5a4-3d5b-48a0-a678-67ed09a6f487_2019-01-04-14-52-43-537",
LastModified = "2019-01-04T14:52:44.000Z", ETag = "\"1c6269ab8b7baa85f0d2567de417f0d0\"",
Size = 35280, Owner = "e7c0d260939d15d18866126da3376642e2d4497f18ed762b608ed2307778bdf1",
StorageClass = "STANDARD", Bucket = "comp-kukupupu-streamed-data",
user_name = "kuku@pupu.com", value = list(---here goes a large json),
obs_id = 1137L), row.names = 1L, class = "data.frame")
Run Code Online (Sandbox Code Playgroud)
我的功能是:
extract_scroll_data <- function(df) {
tryCatch({
j <- fromJSON(unlist(df$value))
if (is_empty(fromJSON(j$sensorsData)) | is_empty(fromJSON(j$eventList))) {
return(tibble())
} else {
return(set_names(as_tibble(fromJSON(j$eventList, bigint_as_char = TRUE),
.name_repair = "unique"),
nm …
Run Code Online (Sandbox Code Playgroud) 我想让以下函数使用furrr
包而不是purrr
包并行运行。
library(furrr)
library(tidyverse)
input <- list(element1 = tibble::tibble(a = c(1, 2), b = c(2, 2)),
element2 = tibble::tibble(a = c(1, 2), b = c(4, 4))
)
multiplier <- function(data, var1, var2){
purrr::map_df(.x = data,
.f = ~ .x %>%
dplyr::mutate(product = {{var1}} * {{var2}})
)
}
multiplier(input, a, b)
Run Code Online (Sandbox Code Playgroud)
但是,当我将其转换为furrr
等效项时,出现错误。
multiplier_parallel <- function(data, var1, var2){
furrr::future_map_dfr(.x = data,
.f = ~ .x %>%
dplyr::mutate(product = {{var1}} * {{var2}})
)
}
future::plan(multiprocess)
multiplier_parallel(input, a, b) …
Run Code Online (Sandbox Code Playgroud) 我目前正在使用furrr
我的模型创建更有组织的执行。我使用 adata.frame
以有序的方式将参数传递给函数,然后使用 afurrr::future_map()
将函数映射到所有参数。在我的本地计算机 (OSX) 上使用顺序和多核 future 时,该函数可以完美运行。
现在,我想测试我的代码,创建我自己的 AWS 实例集群(正如此处所示)。
我使用链接的文章代码创建了一个函数:
make_cluster_ec2 <- function(public_ip){
ssh_private_key_file <- Sys.getenv('PEM_PATH')
github_pac <- Sys.getenv('PAC')
cl_multi <- future::makeClusterPSOCK(
workers = public_ip,
user = "ubuntu",
rshopts = c(
"-o", "StrictHostKeyChecking=no",
"-o", "IdentitiesOnly=yes",
"-i", ssh_private_key_file
),
rscript_args = c(
"-e", shQuote("local({p <- Sys.getenv('R_LIBS_USER'); dir.create(p, recursive = TRUE, showWarnings = FALSE); .libPaths(p)})"),
"-e", shQuote("install.packages('devtools')"),
"-e", shQuote(glue::glue("devtools::install_github('user/repo', auth_token = '{github_pac}')"))
),
dryrun = FALSE)
return(cl_multi)
}
Run Code Online (Sandbox Code Playgroud)
然后,我创建集群对象,然后检查它是否连接到正确的实例
public_ids <- c('public_ip_1', 'public_ip_2') …
Run Code Online (Sandbox Code Playgroud)