我编写了一种算法,它可以获取地理空间数据并执行许多步骤.输入数据是用于大光栅研究区域(约1.5亿像素)的多边形和协变量光栅的形状文件.步骤如下:
整个过程需要多次迭代(比如说100),但是当按顺序处理时,每次迭代当前需要花费一个多小时.对于每次迭代,最耗时的部分是步骤4和5.因为目标网格太大,我一直在处理它一个块(比如说1000行).
我有一个带有32 Gb RAM的6核CPU,所以在每次迭代中,我都使用Python的multiprocessing
模块和一个Pool
对象来同时处理多个块(步骤4和5),然后写出输出(预测)使用调用全局输出写入函数的回调函数到公共输出网格集.这似乎有效,但并不比处理串行中的每个块更快(实际上,它可能更慢).
所以我的问题是,有更有效的方法吗?我对多处理模块的Queue
类感兴趣,但我不确定它是如何工作的.例如,我想知道如果有一个执行步骤4和5的队列然后将结果传递给执行步骤6的另一个队列是否更有效.或者这甚至是Queue的用途?
任何指针将不胜感激.
我正在使用Python 2.7.3.我使用子类multiprocessing.Process
对象并行化了一些代码.如果我的子类Process对象中的代码没有错误,那么一切运行正常.但是,如果有在我的子类的过程对象代码中的错误,他们显然会崩溃默默(没有堆栈跟踪打印到父shell)和CPU使用率将下降到零.父代码永远不会崩溃,给人的印象是执行只是挂起.同时,很难追踪代码中的错误,因为没有给出关于错误位置的指示.
我在stackoverflow上找不到任何其他问题来处理同样的问题.
我想子类化的Process对象似乎是静默崩溃的,因为它们无法向父shell发送错误消息,但我想知道我能做些什么,这样我至少可以更高效地调试(以及其他我的代码的用户可以告诉我他们何时遇到问题).
编辑:我的实际代码过于复杂,但在它的错误的子类的处理对象的一个简单的例子是这样的:
from multiprocessing import Process, Queue
class Worker(Process):
def __init__(self, inputQueue, outputQueue):
super(Worker, self).__init__()
self.inputQueue = inputQueue
self.outputQueue = outputQueue
def run(self):
for i in iter(self.inputQueue.get, 'STOP'):
# (code that does stuff)
1 / 0 # Dumb error
# (more code that does stuff)
self.outputQueue.put(result)
Run Code Online (Sandbox Code Playgroud) 在Windows 10的RStudio中,我编写了一个并行执行计算的函数,如下所示:
doSomething = function(a, b, c) {
# Inner function that does the actual work when parallelised
work = function (a, b, c) {
# Do something
e = func1(a, b)
f = func2(c)
result = e + f
return(result)
}
# Carry out work in parallel
cl = makeCluster(6)
registerDoParallel(cl)
output = foreach(i = 1:10, .packages=c("foo", "bar")) %dopar%
work(a, b, c)
stopCluster(cl)
return(output)
}
Run Code Online (Sandbox Code Playgroud)
如果我从R脚本将函数加载到内存中,这可以正常工作; 但是,我想将它包含在我正在编写的包中.因此,在包文件中,我小心识别外部函数的命名空间并在DESCRIPTION
文件中引用它们的包.例如:
doSomething = function(a, b, c) {
# Inner function that …
Run Code Online (Sandbox Code Playgroud) 我在让R aggregate()
函数以我想要的格式返回data.frame时遇到麻烦。
基本上我像这样运行聚合:
aggregate(df$res, list(full$depth), summary)
Run Code Online (Sandbox Code Playgroud)
其中res
列包含TRUE
,FALSE
和NA
。我想res
根据中的组计算每个值的出现次数,这些组depth
是六个数字深度值0、5、15、30、60和100。根据聚合函数上的帮助页面,它会强制按值的因素,所以这应该不是问题(据我所知)。
因此,我运行了聚合函数并将其存储在data.frame中。这可以; 它运行没有错误。R控制台中显示的摘要如下所示:
Group.1 x.Mode x.FALSE x.TRUE x.NA's
1 0 logical 3 83 0
2 5 logical 3 83 0
3 15 logical 8 78 0
4 30 logical 5 79 2
5 60 logical 1 64 21
6 100 logical 1 24 61
Run Code Online (Sandbox Code Playgroud)
同样,这很好,看起来像我想要的。但是包含结果的data.frame实际上只有两列,看起来像这样:
Group.1 x
1 0 logical
2 5 logical
3 15 …
Run Code Online (Sandbox Code Playgroud) 我有一些Python代码通过rpy2将数据帧传递给R,然后R处理它并将生成的data.frame拉回到R作为PANDAS数据帧com.load_data
.
问题是,调用com.load_data
在单个Python进程中工作正常,但是当同时在多个multiprocessing.Process
进程中运行同一堆代码时它会崩溃.我从Python中得到以下错误消息:
File "C:\\Python27\\lib\\site-packages\\pandas\\rpy\\common.py", line 29, in load_data
r.data(name) TypeError: 'DataFrame' object is not callable'
Run Code Online (Sandbox Code Playgroud)
所以我的问题是,是不是rpy2实际上设计为能够并行运行,或者是它只是在一个错误load_data
的功能?我只是假设每个Python进程都会获得自己独立的R会话.据我所知,唯一的解决方法是让R将输出写入文本文件,相应的Python进程可以打开并继续处理.但这非常笨重.
更新一些代码:
from rpy2.robjects.packages import importr
import rpy2.robjects as ro
import pandas as pd
import pandas.rpy.common as com
# Load C50 library into R environment
C50 = importr('C50')
...
# PANDAS data frame containing test dataset
testing = pd.DataFrame(testing)
# Pass testing dataset to R
rtesting = com.convert_to_r_dataframe(testing)
ro.globalenv['test'] = rtesting
# Strip "AsIs" from …
Run Code Online (Sandbox Code Playgroud)