标签: furrr

嵌套的furrr::future_map?

有没有一种配置方法furrr::future_map可以允许嵌套用例?考虑以下代码:

library(furrr)
library(tictoc)

# The problem is easier to reason about if you take N
# smaller than your number of cores, and M big.
N = 2 
M = 100

plan(sequential)
tic()
x = future_map(1:N, function(i){
  furrr::future_map(1:M,function(j){
    Sys.sleep(1/M)
    return(1)
  })
})
toc() # 2sec + overhead

plan(multiprocess)
tic()
x = future_map(1:N, function(i){
  furrr::future_map(1:M,function(j){
    Sys.sleep(1/M)
    return(1)
  })
})
toc() # one sec + overhead !!
Run Code Online (Sandbox Code Playgroud)

第一个应该需要 2 秒多一点的时间。还行吧。但是,即使在千核机器上,有没有办法让第二个运行时间少于 1 秒?

我的用例如下:一些子任务比其他子任务需要更长的时间才能完成,当一些子任务完成时,一些核心可以自由地进一步分派更长的任务。

但furrr默认情况下不会这样做,并且长期运行的任务最终只在一个核心上。该问题与上面代码中显示的问题相同:是否有一种方法可以在某些核心空闲的情况下让furrr重新分派内部任务?

这是不可能做到的,还是我错过了furrr/future 调用的参数?

r furrr

6
推荐指数
1
解决办法
968
查看次数

如何在 R/future/furrr 中对并行 API 请求进行速率限制

我必须从 Web API (NCBI entrez) 检索大型数据集,该数据集将我每秒的请求数限制为一定数量,例如 10 个(示例代码将在没有 API 密钥的情况下将您限制为 3 个)。我使用 Furrr 的 future_* 函数来并行化请求以尽快获取它们,如下所示:

library(tidyverse)
library(rentrez)
library(furrr)

plan(multiprocess)

api_key <- "<api key>"
# this will return a crap-ton of results
srch <- entrez_search("nuccore", "Homo sapiens", use_history=T, api_key=api_key)

total <- srch$count
per_request <- 500 # get 500 records per parallel request
nrequest <- total %/% per_request + as.logical(total %% per_request)

result <- future_map(seq(nrequest),function(x) {
  rstart <- (x - 1) * per_request
  return(entrez_fetch(
    "nuccore",
    web_history = srch$web_history,
    rettype="fasta",
    retmode="xml",
    retstart=rstart, …
Run Code Online (Sandbox Code Playgroud)

parallel-processing multithreading r rate-limiting furrr

6
推荐指数
0
解决办法
744
查看次数

使用furrr和future在Shiny中显示进度条

我正在开发一个闪亮的应用程序,我使用furrr::和将耗时的计算拆分为多个会话future。我想在每次(或每 n)次计算后更新进度条。我发现下面的示例会更新进度条,但只有在会话完全完成后才会更新。这意味着最后 6 次更新几乎同时发生。对于非 Shiny 版本有争论.progress = T会导致我想要的更新(请参阅第二个示例)。

有没有类似的东西可以用于闪亮的进度条?如果更新仅取决于第一个工作人员,那就更好了,因为所有工作人员的计算几乎都是相同的。

感谢您提前提供任何帮助!

示例1:

library(shiny)
library(progressr)

app <- shinyApp(
  ui = fluidPage(
    plotOutput("plot")
  ),
  
  server = function(input, output) {
    output$plot <- renderPlot({
      X <- 1:24
      future::plan(future::multisession(workers = 6))
      
      withProgressShiny(message = "Calculation in progress",
                        detail = "This may take a while ...", value = 0, {
                          p <- progressor(along = X)
                          y <- furrr::future_map(X, function(x) {
                            Sys.sleep(1)
                            p()
                          })
                        })
      plot(cars)
    })
  }
)
Run Code Online (Sandbox Code Playgroud)

示例2:

X <- 1:24 …
Run Code Online (Sandbox Code Playgroud)

multithreading future r shiny furrr

6
推荐指数
0
解决办法
356
查看次数

furrr 没有找到自己的包

我目前正在开发一个包,假设它被称为 myPack。我有一个名为 myFunc1 的函数和另一个名为 myFunc2 的函数,它看起来像这样:

myFunc2 <- function(x, parallel = FALSE) { 
if(parallel) future::plan(future::multiprocess)
values <- furrr::future_map(x, myFunc1)
values
}
Run Code Online (Sandbox Code Playgroud)

现在,如果我在不并行的情况下调用 myFunc2,它会起作用。但是,如果我使用 parallel = TRUE 调用它,则会出现以下错误:

Error: Unexpected result (of class ‘snow-try-error’ != ‘FutureResult’)
retrieved for MultisessionFuture future (label = ‘<none>’, expression = 
‘{; do.call(function(...) {; ...future.f.env <- environment(...future.f); 
if (!is.null(...future.f.env$`~`)) {; if 
(is_bad_rlang_tilde(...future.f.env$`~`)) {; ...future.f.env$`~` <- 
base::`~`; ...; }); }, args = future.call.arguments); }’): there is no 
package called 'myPack'. This suggests that the communication with 
MultisessionFuture worker …
Run Code Online (Sandbox Code Playgroud)

parallel-processing r package r-future furrr

5
推荐指数
1
解决办法
513
查看次数

如何在data.table中执行更快的列表列操作

由于内存(和速度)问题,我希望在data.table内进行一些计算,而不是在data.table外进行。

以下代码有100.000行,但我正在处理4000万行。

library(tictoc)
library(data.table) # version 1.11.8
library(purrr)
library(furrr)
plan(multiprocess)

veryfing_function <- function(vec1, vec2){
  vector <- as.vector(outer(vec1, vec2, paste0))
  split(vector, ceiling(seq_along(vector)/length(vec1)))
}


dt <- data.table(letters = replicate(1e6, sample(letters[1:5], 3, TRUE), simplify = FALSE),
                 numbers = replicate(1e6, sample(letters[6:10], 3, TRUE), simplify = FALSE))



tic()
result1 <- future_map2(dt$letters, dt$numbers, veryfing_function)
toc()


tic()
result2 <- mapply(veryfing_function, dt$letters, dt$numbers, SIMPLIFY = FALSE)
toc()



tic()
dt[, result := future_map2(letters, numbers, veryfing_function)]
toc()


tic()
dt[, result2 := mapply(veryfing_function, letters, numbers, SIMPLIFY = FALSE)]
toc()

Run Code Online (Sandbox Code Playgroud)

所有变体的输出都是相同的,并且符合预期。基准是:

26秒72秒38秒105秒,所以我看不到使用data.table内的函数或使用mapply的优势。 …

r mapply data.table furrr

5
推荐指数
1
解决办法
112
查看次数

当我使用“dplyr::mutate()”时,为什么“furrr::future_map_int()”比“purrr::map_int()”慢?

我有一个tibble,其中包含一个列表列,其中包含向量。我想创建一个新列来说明每个向量的长度。由于这个数据集很大(3M 行),我想使用该包来减少一些处理时间furrr。不过,看起来purrr比 更快furrr。怎么会?

为了演示这个问题,我首先模拟一些数据。不要费心去理解模拟部分的代码,因为它与问题无关。


数据模拟功能

library(stringi)
library(rrapply)
library(tibble)

simulate_data <- function(nrows) {
  split_func <- function(x, n) {
    unname(split(x, rep_len(1:n, length(x))))
  }
  
  randomly_subset_vec <- function(x) {
    sample(x, sample(length(x), 1))
  }
  
  tibble::tibble(
    col_a = rrapply(object = split_func(
      x = setNames(1:(nrows * 5),
                   stringi::stri_rand_strings(nrows * 5,
                                              2)),
      n = nrows
    ),
    f      = randomly_subset_vec),
    col_b = runif(nrows)
  )
  
} 
Run Code Online (Sandbox Code Playgroud)

模拟数据

set.seed(2021)

my_data <- simulate_data(3e6) # takes about 1 minute to run on my …
Run Code Online (Sandbox Code Playgroud)

r purrr furrr

5
推荐指数
1
解决办法
609
查看次数

Furrr / purrr 进度条与计算进度根本不同步

我想创建一个函数,它接受一个函数并对 a 中的每一行应用一次,tibble参数存储在 a 的相应命名列中tibble 我意识到这听起来有点奇怪,但我希望面向用户的函数/功能简单。

在大多数情况下,处理会花费很多时间,所以我真的更喜欢有进度条功能,这就是我发现很大麻烦的地方:

此代码有效(然后没有进度条):

library(tibble)
library(dplyr)
library(purrr)
library(furrr)
library(tidyr)
library(wrassp)
library(progressr)

xf <- function(x,trim,na.rm,ds="ded"){
  return(x*trim*na.rm)
}

xf2 <- function(x,trim,na.rm,ds="ded"){
  return(list("a"=x,"b"=trim))
}

xf3 <- function(x,trim,na.rm,ds="ded"){
  return(data.frame("a"=x,"b"=trim))
}

mymap <- function(f,...){
  plan(multisession)
  exDF <- tribble(
    ~x, ~trim, ~na.rm, ~notarg, ~listOfFiles, ~toFile,
    0.5, 0, TRUE, 11.2, "~/Desktop/a1.wav", FALSE, 
    0.4, 0.5, TRUE, 12, "~/Desktop/a1.wav", FALSE
  )

  dotArgs <- list(...)
  dotArgsRT <- as_tibble_row(dotArgs)

  dotArgsNames <- names(dotArgs)
  
  allArgsNames <- formalArgs(f)
  
  exDF %>% 
    select(-any_of(!!dotArgsNames)) %>%
    bind_cols(dotArgsRT) %>% 
    select(any_of(allArgsNames)) %>%
    rowwise() …
Run Code Online (Sandbox Code Playgroud)

r pmap purrr furrr

5
推荐指数
1
解决办法
356
查看次数

R:异步并行lapply

到目前为止,我发现lapply在 R 中使用并行的最简单方法是通过以下示例代码:

library(parallel)
library(pbapply)

cl <- makeCluster(10)
clusterExport(cl = cl, {...})
clusterEvalQ(cl = cl, {...})

results <- pblapply(1:100, FUN = function(x){rnorm(x)}, cl = cl)
Run Code Online (Sandbox Code Playgroud)

它有一个非常有用的功能,即为结果提供进度条,并且当​​不需要并行计算时,通过设置 可以很容易地重用相同的代码cl = NULL

然而,我注意到的一个问题是它pblapply正在批量循环遍历列表。例如,如果一个工人在某项任务上停留了很长时间,那么剩下的工人将等待该任务完成,然后再开始一批新的工作。对于某些任务,这会为工作流程增加大量不必要的时间。

我的问题: 是否有任何类似的并行框架允许工作人员独立运行?进度条和重用代码的能力cl=NULL将是一个很大的优势。

也许可以修改现有代码pbapply来添加此选项/功能?

parallel-processing r multiprocessing lapply furrr

4
推荐指数
1
解决办法
1548
查看次数

在数据帧行上运行 purrr::map_dfr?

dataframe给定一个iris默认值,如何配置purrr::map_dfr()函数在每一行上运行dataframe并执行函数foo

这是我的 df 的一行,请考虑到该值始终是一个大 JSON:

structure(list(Key = "2019/01/04/14/kuku@pupu.com_2ed026cb-8e9f-4392-9cc4-9f580b9d3aab_1345a5a4-3d5b-48a0-a678-67ed09a6f487_2019-01-04-14-52-43-537", 
    LastModified = "2019-01-04T14:52:44.000Z", ETag = "\"1c6269ab8b7baa85f0d2567de417f0d0\"", 
    Size = 35280, Owner = "e7c0d260939d15d18866126da3376642e2d4497f18ed762b608ed2307778bdf1", 
    StorageClass = "STANDARD", Bucket = "comp-kukupupu-streamed-data", 
    user_name = "kuku@pupu.com", value = list(---here goes a large json), 
    obs_id = 1137L), row.names = 1L, class = "data.frame")
Run Code Online (Sandbox Code Playgroud)

我的功能是:

extract_scroll_data <- function(df) {

  tryCatch({

    j <- fromJSON(unlist(df$value))

    if (is_empty(fromJSON(j$sensorsData)) | is_empty(fromJSON(j$eventList))) {

      return(tibble())

    } else {

      return(set_names(as_tibble(fromJSON(j$eventList, bigint_as_char = TRUE), 
                                 .name_repair = "unique"), 
                       nm …
Run Code Online (Sandbox Code Playgroud)

functional-programming r dataframe purrr furrr

2
推荐指数
1
解决办法
3008
查看次数

使用 furrr 进行 tidy 评估

我想让以下函数使用furrr包而不是purrr包并行运行。

library(furrr)
library(tidyverse)

input <- list(element1 = tibble::tibble(a = c(1, 2), b = c(2, 2)),
              element2 = tibble::tibble(a = c(1, 2), b = c(4, 4))
)

multiplier <- function(data, var1, var2){
  purrr::map_df(.x = data,
                .f = ~ .x %>% 
                  dplyr::mutate(product = {{var1}} * {{var2}})
  )
}

multiplier(input, a, b)
Run Code Online (Sandbox Code Playgroud)

但是,当我将其转换为furrr等效项时,出现错误。

multiplier_parallel <- function(data, var1, var2){
  furrr::future_map_dfr(.x = data,
                .f = ~ .x %>% 
                  dplyr::mutate(product = {{var1}} * {{var2}})
  )
}

future::plan(multiprocess)

multiplier_parallel(input, a, b) …
Run Code Online (Sandbox Code Playgroud)

future r purrr tidyeval furrr

2
推荐指数
1
解决办法
185
查看次数

如何正确使用 R future (furrr) 包中的集群计划

我目前正在使用furrr我的模型创建更有组织的执行。我使用 adata.frame以有序的方式将参数传递给函数,然后使用 afurrr::future_map()将函数映射到所有参数。在我的本地计算机 (OSX) 上使用顺序和多核 future 时,该函数可以完美运行。

现在,我想测试我的代码,创建我自己的 AWS 实例集群(正如此处所示)。

我使用链接的文章代码创建了一个函数:

make_cluster_ec2  <- function(public_ip){
  ssh_private_key_file  <-  Sys.getenv('PEM_PATH')
  github_pac  <-  Sys.getenv('PAC')

  cl_multi <- future::makeClusterPSOCK(
  workers = public_ip,
  user = "ubuntu",
  rshopts = c(
    "-o", "StrictHostKeyChecking=no",
    "-o", "IdentitiesOnly=yes",
    "-i", ssh_private_key_file
  ),
  rscript_args = c(
    "-e", shQuote("local({p <- Sys.getenv('R_LIBS_USER'); dir.create(p, recursive = TRUE, showWarnings = FALSE); .libPaths(p)})"),
    "-e", shQuote("install.packages('devtools')"),
    "-e", shQuote(glue::glue("devtools::install_github('user/repo', auth_token = '{github_pac}')"))
  ),
  dryrun = FALSE)

  return(cl_multi)

}
Run Code Online (Sandbox Code Playgroud)

然后,我创建集群对象,然后检查它是否连接到正确的实例

public_ids <- c('public_ip_1', 'public_ip_2') …
Run Code Online (Sandbox Code Playgroud)

parallel-processing r r-future furrr

1
推荐指数
1
解决办法
1361
查看次数