Axe*_*971 3 parallel-processing r cluster-computing r-future
我想for
在集群的节点(几台机器)内分配作业(带有循环)。我尝试使用 R 包future
来做到这一点。我不知道这是否是最好的方法;我尝试使用foreach
该doParallel
包,但没有成功。如何判断循环迭代次数何时大于集群节点数?
library(doParallel);
library(doFuture);
#library(future);
registerDoFuture();
workers <- c(rep("129.20.25.61",1), rep("129.20.25.217",1));
cl <- makeClusterPSOCK(workers, revtunnel = TRUE, outfile = "", verbose = FALSE);
plan(cluster, workers = cl)
mu <- 1.0
sigma <- 2.0
for(i in 1:3){
res %<-%{ rnorm(i, mean = mu, sd = sigma)}
print(i);
}
Run Code Online (Sandbox Code Playgroud)
如果您使用普通的 Future API,即future()
+value()
或%<-%
,则无需涉及 foreach、doFuture 等。以下是如何单独使用 Future API 以及您可以期望的输出:
(A) 设置工人
library("future")
workers <- c("129.20.25.61", "129.20.25.217")
cl <- makeClusterPSOCK(workers, revtunnel = TRUE, outfile = "")
### starting worker pid=20026 on localhost:11900 at 11:47:28.334
### starting worker pid=12291 on localhost:11901 at 11:47:37.172
print(cl)
### socket cluster with 2 nodes on hosts '129.20.25.61', '129.20.25.217'
plan(cluster, workers = cl)
Run Code Online (Sandbox Code Playgroud)
(B) 显式 Future API
在这里,我们显式使用创建一个 futures 列表,并使用(基本上等于调用)future()
检索它们的值。values()
lapply(f, FUN = value)
mu <- 1.0
sigma <- 2.0
f <- list()
for (i in 1:3) {
f[[i]] <- future({ rnorm(i, mean = mu, sd = sigma) })
}
v <- values(f)
str(v)
### List of 3
### $ : num 3.25
### $ : num [1:2] 3.24 3.29
### $ : num [1:3] 1.251 2.299 0.923
Run Code Online (Sandbox Code Playgroud)
(C) 隐式 Future API
在这种替代方案中,我们使用 future 赋值运算符隐式创建 futures %<-%
(这在内部会执行future()
,然后value()
当您尝试访问 future 的值时)。由于%<-%
只能分配给环境(而不是列表、data.frames 等),因此我们需要使用一个作为环境的容器。这里我使用listenv类,它是一个环境,但允许您将其索引为列表。
library("listenv") ## listenv()
mu <- 1.0
sigma <- 2.0
v <- listenv()
for (i in 1:3) {
v[[i]] %<-% { rnorm(i, mean = mu, sd = sigma) }
}
v <- as.list(v)
str(v)
### List of 3
### $ : num 1.15
### $ : num [1:2] 2.2277 -0.0164
### $ : num [1:3] -2.09 3.34 -1.09
Run Code Online (Sandbox Code Playgroud)
(四) 使用future_lapply()
如果您更喜欢lapply()
类似的方法,您可以这样做:
v <- future_lapply(1:3, FUN = function(i) {
rnorm(i, mean = mu, sd = sigma)
})
str(v)
### List of 3
### $ : num 2.12
### $ : num [1:2] 2.56 -1.21
### $ : num [1:3] 2.89 -0.159 -0.983
Run Code Online (Sandbox Code Playgroud)
(四) 使用foreach()
如果您想使用foreach()
,那么您可以执行以下操作。foreach()
请注意,在使用foreach 的每个设计时,最好始终显式导出全局变量- 但是,如果您始终使用doFuture
它实际上是不需要的。
library("doFuture")
registerDoFuture()
workers <- c("129.20.25.61", "129.20.25.217")
cl <- makeClusterPSOCK(workers, revtunnel = TRUE, outfile = "")
plan(cluster, workers = cl)
v <- foreach(i = 1:3, .export = c("mu", "sigma")) %dopar% {
rnorm(i, mean = mu, sd = sigma)
}
str(v)
### List of 3
### $ : num 3.12
### $ : num [1:2] -0.0887 -2.8016
### $ : num [1:3] 2.15 3.5 -2.24
Run Code Online (Sandbox Code Playgroud)
如何判断循环迭代次数何时高于集群节点数?
我不确定你在这里问什么。您是否担心同时运行的 future 数量多于您的工人数量?如果是这样,则会自动处理。如果所有工人都被占用,那么额外的 future 的创建将被阻止,直到其中一名工人再次可用为止。