如何使用foreach克服内存限制

Sam*_*amo 5 memory parallel-processing foreach memory-management r

我正在尝试处理> 10000 xts保存在磁盘上的对象,每个加载到R时大约为0.2 GB.我想使用foreach并行处理这些对象.我的代码适用于100 xts对象,我在内存中预先加载,导出等.但是在> 100 xts对象后,我在我的机器上达到了内存限制.

我想要做的例子:

require(TTR)
require(doMPI)
require(foreach)

test.data <- runif(n=250*10*60*24)

xts.1 <- xts(test.data, order.by=as.Date(1:length(test.data)))
xts.1 <- cbind(xts.1, xts.1, xts.1, xts.1, xts.1, xts.1)

colnames(xts.1) <- c("Open", "High", "Low", "Close", "Volume", "Adjusted")

print(object.size(xts.1), units="Gb")

xts.2 <- xts.1
xts.3 <- xts.1
xts.4 <- xts.1

save(xts.1, file="xts.1.rda")
save(xts.2, file="xts.2.rda")
save(xts.3, file="xts.3.rda")
save(xts.4, file="xts.4.rda")

names <- c("xts.1", "xts.2", "xts.3", "xts.4")

rm(xts.1)
rm(xts.2)
rm(xts.3)
rm(xts.4)

cl <- startMPIcluster(count=2) # Use 2 cores
registerDoMPI(cl)

result <- foreach(name=names, 
                  .combine=cbind, 
                  .multicombine=TRUE, 
                  .inorder=FALSE, 
                  .packages=c("TTR")) %dopar% {
    # TODO: Move following line out of worker. One (or 5, 10,
    # 20, ... but not all) object at a time should be loaded 
    # by master and exported to worker "just in time"
    load(file=paste0(name, ".rda"))

    return(last(SMA(get(name)[, 1], 10)))
}

closeCluster(cl)

print(result)
Run Code Online (Sandbox Code Playgroud)

所以我想知道我是如何能够加载每个(或几个像5,10,20,100,但不是所有的一次)xts对象从磁盘"及时"之前发送/需要/出口给工人.我无法在worker中加载对象(基于存储在磁盘上的名称和文件夹),因为工作人员可以在远程计算机上无法访问存储在磁盘上的对象的文件夹.所以我需要能够在主要过程中"及时"读取/加载它们......

我使用doMPI和doRedis作为并行后端.doMPI看起来更有效,但比doRedis慢(在100个对象上).

所以我想了解解决这个问题的正确"策略"/"模式"是什么.

Ste*_*ton 4

除了使用 doMPI 或 doRedis 之外,您还需要编写一个返回适当迭代器的函数。我的迭代器包中的小插图“编写自定义迭代器”中有许多示例应该会有所帮助,但这里是对此类函数的快速尝试:

ixts <- function(xtsnames) {
  it <- iter(xtsnames)

  nextEl <- function() {
    xtsname <- nextElem(it)  # throws "StopIteration"
    load(file=paste0(xtsname, ".rda"))
    get(xtsname)
  }

  obj <- list(nextElem=nextEl)
  class(obj) <- c('ixts', 'abstractiter', 'iter')
  obj
}
Run Code Online (Sandbox Code Playgroud)

这非常简单,因为它基本上是“names”变量上迭代器的包装器。插图中的几个示例都使用了这种技术。

您可以将“ixts”与 foreach 一起使用,如下所示:

result <- foreach(xts=ixts(names),
                  .combine=cbind, 
                  .multicombine=TRUE, 
                  .inorder=FALSE, 
                  .packages=c("TTR")) %dopar% {
    last(SMA(xts[, 1], 10))
}
Run Code Online (Sandbox Code Playgroud)

尽管此迭代器适用于任何 foreach 后端,但并非所有后端都会调用它just-in-time。doMPI 和 doRedis 会,但 doParallel 和 doMC 会预先从迭代器获取所有值,因为 clusterApplyLB 和 mclapply 要求这些值全部位于列表中。doMPI 和 doRedis 被设计为与迭代器一起使用,以提高内存效率。