Rya*_*son 15 parallel-processing r vectorization
我有一个非常大的列表X和一个矢量化函数f.我想计算f(X),但如果我用一个核心来做这个将需要很长时间.我有(访问)48核服务器.并行计算的最简单方法是f(X)什么?以下不是正确的答案:
library(foreach)
library(doMC)
registerDoMC()
foreach(x=X, .combine=c) %dopar% f(x)
Run Code Online (Sandbox Code Playgroud)
上面的代码确实会对计算进行并行化f(X),但它会通过f单独应用于每个元素来实现X.这忽略了矢量化的性质,f并且可能会使事情变慢,而不是更快.而不是应用felementwise X,我想X分成合理大小的块并应用于f那些.
那么,我应该手动拆分X成48个相等大小的子列表然后f并行应用于每个子列表,然后手动将结果放在一起?或者是否有为此设计的包装?
如果有人想知道,我的具体用例就在这里.
这是我的实现。它是一个函数chunkmap,它接受一个向量化函数、一个应该向量化的参数列表和一个不应该向量化的参数列表(即常量),并返回与直接在参数上调用该函数相同的结果,除了结果是并行计算的。对于函数f、向量参数v1、v2、v3和标量参数s1, s2,以下内容应返回相同的结果:
f(a=v1, b=v2, c=v3, d=s1, e=s2)
f(c=v3, b=v2, e=s2, a=v1, d=s1)
chunkapply(FUN=f, VECTOR.ARGS=list(a=v1, b=v2, c=v3), SCALAR.ARGS=list(d=s1, e=s2))
chunkapply(FUN=f, SCALAR.ARGS=list(e=s2, d=s1), VECTOR.ARGS=list(a=v1, c=v3, b=v2))
Run Code Online (Sandbox Code Playgroud)
由于函数不可能chunkapply知道哪些参数f是矢量化的,哪些不是,所以您需要在调用它时指定,否则您将得到错误的结果。您通常应该命名您的参数以确保它们正确绑定。
library(foreach)
library(iterators)
# Use your favorite doPar backend here
library(doMC)
registerDoMC()
get.chunk.size <- function(vec.length,
min.chunk.size=NULL, max.chunk.size=NULL,
max.chunks=NULL) {
if (is.null(max.chunks)) {
max.chunks <- getDoParWorkers()
}
size <- vec.length / max.chunks
if (!is.null(max.chunk.size)) {
size <- min(size, max.chunk.size)
}
if (!is.null(min.chunk.size)) {
size <- max(size, min.chunk.size)
}
num.chunks <- ceiling(vec.length / size)
actual.size <- ceiling(vec.length / num.chunks)
return(actual.size)
}
ichunk.vectors <- function(vectors=NULL,
min.chunk.size=NULL,
max.chunk.size=NULL,
max.chunks=NULL) {
## Calculate number of chunks
recycle.length <- max(sapply(vectors, length))
actual.chunk.size <- get.chunk.size(recycle.length, min.chunk.size, max.chunk.size, max.chunks)
num.chunks <- ceiling(recycle.length / actual.chunk.size)
## Make the chunk iterator
i <- 1
it <- idiv(recycle.length, chunks=num.chunks)
nextEl <- function() {
n <- nextElem(it)
ix <- seq(i, length = n)
i <<- i + n
vchunks <- foreach(v=vectors) %do% v[1+ (ix-1) %% length(v)]
names(vchunks) <- names(vectors)
vchunks
}
obj <- list(nextElem = nextEl)
class(obj) <- c("ichunk", "abstractiter", "iter")
obj
}
chunkapply <- function(FUN, VECTOR.ARGS, SCALAR.ARGS=list(), MERGE=TRUE, ...) {
## Check that the arguments make sense
stopifnot(is.list(VECTOR.ARGS))
stopifnot(length(VECTOR.ARGS) >= 1)
stopifnot(is.list(SCALAR.ARGS))
## Choose appropriate combine function
if (MERGE) {
combine.fun <- append
} else {
combine.fun <- foreach:::defcombine
}
## Chunk and apply, and maybe merge
foreach(vchunk=ichunk.vectors(vectors=VECTOR.ARGS, ...),
.combine=combine.fun,
.options.multicore = mcoptions) %dopar%
{
do.call(FUN, args=append(vchunk, SCALAR.ARGS))
}
}
## Only do chunkapply if it will run in parallel
maybe.chunkapply <- function(FUN, VECTOR.ARGS, SCALAR.ARGS=list(), ...) {
if (getDoParWorkers() > 1) {
chunkapply(FUN, VECTOR.ARGS, SCALAR.ARGS, ...)
} else {
do.call(FUN, append(VECTOR.ARGS, SCALAR.ARGS))
}
}
Run Code Online (Sandbox Code Playgroud)
以下是一些示例,显示chunkapply(f,list(x))产生与 相同的结果f(x)。我将 max.chunk.size 设置得非常小,以确保实际使用分块算法。
> # Generate all even integers from 2 to 100 inclusive
> identical(chunkapply(function(x,y) x*y, list(1:50), list(2), max.chunk.size=10), 1:50 * 2)
[1] TRUE
> ## Sample from a standard normal distribution, then discard values greater than 1
> a <- rnorm(n=100)
> cutoff <- 1
> identical(chunkapply(function(x,limit) x[x<=limit], list(x=a), list(limit=cutoff), max.chunk.size=10), a[a<cutoff])
[1] TRUE
Run Code Online (Sandbox Code Playgroud)
如果有人有比“chunkapply”更好的名字,请推荐。
正如另一个答案指出的那样,pvec多核包中调用了一个函数,它的功能与我编写的功能非常相似。对于简单的情况,您应该这样做,并且您应该投票赞成 Jonas Rauch 的答案。但是,我的函数有点通用,因此如果以下任何一项适用于您,您可能需要考虑使用我的函数:
pvec仅对单个参数进行向量化,因此您无法轻松地使用 来实现并行向量化加法pvec。我的函数允许您指定任意参数。itertools 包就是为了解决此类问题而设计的。在这种情况下,我会使用isplitVector:
n <- getDoParWorkers()
foreach(x=isplitVector(X, chunks=n), .combine='c') %dopar% f(x)
Run Code Online (Sandbox Code Playgroud)
对于这个例子来说,pvec无疑更快更简单,但是这可以在 Windows 上使用 doParallel 包。
| 归档时间: |
|
| 查看次数: |
1682 次 |
| 最近记录: |