我正在使用raster
package 并尝试切换到,terra
但由于某些我不明白的原因,无法重现与和等包并行工作时terra
的相同操作。这是一个可重现的示例。raster
snowfall
future.apply
library(terra)
r <- rast()
r[] <- 1:ncell(r)
m <- rast()
m[] <- c(rep(1,ncell(m)/5),rep(2,ncell(m)/5),rep(3,ncell(m)/5),rep(4,ncell(m)/5),rep(5,ncell(m)/5))
ms <- separate(m,other=NA)
plot(ms)
mymask <- function(ind){
tipo <- tipo_tav[ind]
mask <- ms[[ind]]
masked <-
terra::mask(
r,
mask
)
richard <- function(x){
k <-0.2
v <-0.3
a <-200
y0 <-2
y <- k/v*x*(1-((x/a)^v))+y0
return(y)
}
pred <- richard(masked)
pred <- clamp(pred,lower=0)
return(pred)
}
#the sequential usage works fine, faster than the `raster` counterpart
system.time(x <- mymask(1))#0.03 …
Run Code Online (Sandbox Code Playgroud) 最近,我一直在使用future
(andfuture.apply
和furrr
) 在 R 中进行一些并行处理,这大部分都很棒,但我偶然发现了一些我无法解释的东西。这可能是某个地方的错误,但也可能是我编码的草率。如果有人能解释这种行为,我们将不胜感激。
我正在对数据的不同子组运行模拟。对于每个组,我想运行模拟n
时间,然后计算结果的一些汇总统计数据。以下是一些示例代码,用于重现我的基本设置并演示我所看到的问题:
library(tidyverse)
library(future)
library(future.apply)
# Helper functions
#' Calls out to `free` to get total system memory used
sys_used <- function() {
.f <- system2("free", "-b", stdout = TRUE)
as.numeric(unlist(strsplit(.f[2], " +"))[3])
}
#' Write time, and memory usage to log file in CSV format
#' @param .f the file to write to
#' @param .id identifier for the row to be written
mem_string <- function(.f, .id) …
Run Code Online (Sandbox Code Playgroud) 我习惯future_lapply()
在 Linux 机器上并行我的代码。如果我提前终止进程,则只有一个工作线程被释放,并且并行进程将继续存在。我知道我可以输入tools::pskill(PID)
结束每个单独的进程,但这很乏味,因为我在 26 个核心上运行。
是否有办法从 R 对 linux 进行系统调用来获取所有活动的 PID?
future_lapply
我是这样设置的:
# set number of workers
works <- 26
plan(multiprocess, workers = works)
future_lapply(datas, function(data) {
# do some long processes
}
Run Code Online (Sandbox Code Playgroud)
因为我的并行会话仍在运行。
更新会话信息:
version.string R version 3.6.2 (2019-12-12)
future 1.12.0
future.apply 1.2.0
我目前正在开发一个包,假设它被称为 myPack。我有一个名为 myFunc1 的函数和另一个名为 myFunc2 的函数,它看起来像这样:
myFunc2 <- function(x, parallel = FALSE) {
if(parallel) future::plan(future::multiprocess)
values <- furrr::future_map(x, myFunc1)
values
}
Run Code Online (Sandbox Code Playgroud)
现在,如果我在不并行的情况下调用 myFunc2,它会起作用。但是,如果我使用 parallel = TRUE 调用它,则会出现以下错误:
Error: Unexpected result (of class ‘snow-try-error’ != ‘FutureResult’)
retrieved for MultisessionFuture future (label = ‘<none>’, expression =
‘{; do.call(function(...) {; ...future.f.env <- environment(...future.f);
if (!is.null(...future.f.env$`~`)) {; if
(is_bad_rlang_tilde(...future.f.env$`~`)) {; ...future.f.env$`~` <-
base::`~`; ...; }); }, args = future.call.arguments); }’): there is no
package called 'myPack'. This suggests that the communication with
MultisessionFuture worker …
Run Code Online (Sandbox Code Playgroud) 我目前正在使用 future 包进行并行化,如下所示:
plan(multisession, gc = TRUE)
standardised_addresses1 <- future_lapply(1:20000, function(x) x*x)
Run Code Online (Sandbox Code Playgroud)
问题是它使用了服务器上的所有 CPU。我想通过设置如下参数来限制使用的CPU数量:workers = 18
我正在尝试创建一个闪亮的下载处理程序,但使用 future_promise() 因为写入文件可能需要一些时间。这是我想做的一个工作示例,但不使用异步框架:
一个有效的 .Rmd 闪亮应用程序:当您单击按钮时,它会将 10 个随机偏差写入文件并提供下载。我添加了 5 秒的延迟。
---
title: "download, no futures"
runtime: shiny
output: html_document
---
```{r setup, include=FALSE}
library(dplyr)
knitr::opts_chunk$set(echo = FALSE)
```
This version works.
```{r}
renderUI({
button_reactive <- reactive({
y = rnorm(10)
Sys.sleep(5)
tf = tempfile(fileext = ".txt")
cat(c(y,'\n'), sep='\n', file = tf)
d = readBin(con = tf, what = "raw", n = file.size(tf))
return(list(fn = basename(tf), d = d))
})
output$button <- downloadHandler(
filename = function() {
button_reactive() %>%
`[[`('fn')
},
content …
Run Code Online (Sandbox Code Playgroud) 我想for
在集群的节点(几台机器)内分配作业(带有循环)。我尝试使用 R 包future
来做到这一点。我不知道这是否是最好的方法;我尝试使用foreach
该doParallel
包,但没有成功。如何判断循环迭代次数何时大于集群节点数?
library(doParallel);
library(doFuture);
#library(future);
registerDoFuture();
workers <- c(rep("129.20.25.61",1), rep("129.20.25.217",1));
cl <- makeClusterPSOCK(workers, revtunnel = TRUE, outfile = "", verbose = FALSE);
plan(cluster, workers = cl)
mu <- 1.0
sigma <- 2.0
for(i in 1:3){
res %<-%{ rnorm(i, mean = mu, sd = sigma)}
print(i);
}
Run Code Online (Sandbox Code Playgroud) 我使用检查点包进行可重复的数据分析。有些计算需要很长时间才能计算,所以我想并行运行它们。然而,当并行运行时,检查点未在工作线程上设置,因此我收到一条错误消息“没有名为 xy 的包”(因为它没有安装在我的默认库目录中)。
我如何确保每个工作人员都使用检查点文件夹中的包版本?我尝试在 foreach 代码中设置 .libPaths 但这似乎不起作用。我还希望在全局范围内设置检查点/libPaths 一次,而不是在每个 foreach 调用中设置一次。
另一种选择可能是更改 .Rprofile 文件,但我不想这样做。
checkpoint::checkpoint("2018-06-01")
library(foreach)
library(doFuture)
library(future)
doFuture::registerDoFuture()
future::plan("multisession")
l <- .libPaths()
# Code to run in parallel does not make much sense of course but I wanted to keep it simple.
res <- foreach::foreach(
x = unique(iris$Species),
lib.path = l
) %dopar% {
.libPaths(lib.path)
stringr::str_c(x, "_")
}
Run Code Online (Sandbox Code Playgroud)
{ 中的错误:任务 2 失败 - “没有名为‘stringr’的包”
我目前正在使用furrr
我的模型创建更有组织的执行。我使用 adata.frame
以有序的方式将参数传递给函数,然后使用 afurrr::future_map()
将函数映射到所有参数。在我的本地计算机 (OSX) 上使用顺序和多核 future 时,该函数可以完美运行。
现在,我想测试我的代码,创建我自己的 AWS 实例集群(正如此处所示)。
我使用链接的文章代码创建了一个函数:
make_cluster_ec2 <- function(public_ip){
ssh_private_key_file <- Sys.getenv('PEM_PATH')
github_pac <- Sys.getenv('PAC')
cl_multi <- future::makeClusterPSOCK(
workers = public_ip,
user = "ubuntu",
rshopts = c(
"-o", "StrictHostKeyChecking=no",
"-o", "IdentitiesOnly=yes",
"-i", ssh_private_key_file
),
rscript_args = c(
"-e", shQuote("local({p <- Sys.getenv('R_LIBS_USER'); dir.create(p, recursive = TRUE, showWarnings = FALSE); .libPaths(p)})"),
"-e", shQuote("install.packages('devtools')"),
"-e", shQuote(glue::glue("devtools::install_github('user/repo', auth_token = '{github_pac}')"))
),
dryrun = FALSE)
return(cl_multi)
}
Run Code Online (Sandbox Code Playgroud)
然后,我创建集群对象,然后检查它是否连接到正确的实例
public_ids <- c('public_ip_1', 'public_ip_2') …
Run Code Online (Sandbox Code Playgroud)