小编hen*_*dra的帖子

Python多处理设计

我编写了一种算法,它可以获取地理空间数据并执行许多步骤.输入数据是用于大光栅研究区域(约1.5亿像素)的多边形和协变量光栅的形状文件.步骤如下:

  1. 从shapefile的多边形内取样点
  2. 对于每个采样点,从协变量栅格中提取值
  3. 在采样点上建立预测模型
  4. 提取目标网格点的协变量
  5. 将预测模型应用于目标网格
  6. 将预测写入一组输出网格

整个过程需要多次迭代(比如说100),但是当按顺序处理时,每次迭代当前需要花费一个多小时.对于每次迭代,最耗时的部分是步骤4和5.因为目标网格太大,我一直在处理它一个块(比如说1000行).

我有一个带有32 Gb RAM的6核CPU,所以在每次迭代中,我都使用Python的multiprocessing模块和一个Pool对象来同时处理多个块(步骤4和5),然后写出输出(预测)使用调用全局输出写入函数的回调函数到公共输出网格集.这似乎有效,但并不比处理串行中的每个块更快(实际上,它可能更慢).

所以我的问题是,有更有效的方法吗?我对多处理模块的Queue类感兴趣,但我不确定它是如何工作的.例如,我想知道如果有一个执行步骤4和5的队列然后将结果传递给执行步骤6的另一个队列是否更有效.或者这甚至是Queue的用途?

任何指针将不胜感激.

python iteration multiprocessing gdal

18
推荐指数
1
解决办法
1579
查看次数

Python多处理进程以静默方式崩溃

我正在使用Python 2.7.3.我使用子类multiprocessing.Process对象并行化了一些代码.如果我的子类Process对象中的代码没有错误,那么一切运行正常.但是,如果有在我的子类的过程对象代码中的错误,他们显然会崩溃默默(没有堆栈跟踪打印到父shell)和CPU使用率将下降到零.父代码永远不会崩溃,给人的印象是执行只是挂起.同时,很难追踪代码中的错误,因为没有给出关于错误位置的指示.

我在stackoverflow上找不到任何其他问题来处理同样的问题.

我想子类化的Process对象似乎是静默崩溃的,因为它们无法向父shell发送错误消息,但我想知道我能做些什么,这样我至少可以更高效地调试(以及其他我的代码的用户可以告诉我他们何时遇到问题).

编辑:我的实际代码过于复杂,但在它的错误的子类的处理对象的一个简单的例子是这样的:

from multiprocessing import Process, Queue

class Worker(Process):

    def __init__(self, inputQueue, outputQueue):

        super(Worker, self).__init__()

        self.inputQueue = inputQueue
        self.outputQueue = outputQueue

    def run(self):

        for i in iter(self.inputQueue.get, 'STOP'):

            # (code that does stuff)

            1 / 0 # Dumb error

            # (more code that does stuff)

            self.outputQueue.put(result)
Run Code Online (Sandbox Code Playgroud)

python parallel-processing multiprocessing python-2.7

8
推荐指数
1
解决办法
1万
查看次数

R找不到功能"%dopar%"

在Windows 10的RStudio中,我编写了一个并行执行计算的函数,如下所示:

doSomething = function(a, b, c) {

    # Inner function that does the actual work when parallelised
    work = function (a, b, c) {
        # Do something
        e = func1(a, b)
        f = func2(c)
        result = e + f

        return(result)
    }

    # Carry out work in parallel
    cl = makeCluster(6)
    registerDoParallel(cl)
    output = foreach(i = 1:10, .packages=c("foo", "bar")) %dopar%
        work(a, b, c)
    stopCluster(cl)

    return(output)
}
Run Code Online (Sandbox Code Playgroud)

如果我从R脚本将函数加载到内存中,这可以正常工作; 但是,我想将它包含在我正在编写的包中.因此,在包文件中,我小心识别外部函数的命名空间并在DESCRIPTION文件中引用它们的包.例如:

doSomething = function(a, b, c) {

    # Inner function that …
Run Code Online (Sandbox Code Playgroud)

parallel-processing r

7
推荐指数
1
解决办法
3953
查看次数

如何从R的聚合函数中以正确的格式获取data.frame?

我在让R aggregate()函数以我想要的格式返回data.frame时遇到麻烦。

基本上我像这样运行聚合:

aggregate(df$res, list(full$depth), summary)
Run Code Online (Sandbox Code Playgroud)

其中res列包含TRUEFALSENA。我想res根据中的组计算每个值的出现次数,这些组depth是六个数字深度值0、5、15、30、60和100。根据聚合函数上的帮助页面,它会强制按值的因素,所以这应该不是问题(据我所知)。

因此,我运行了聚合函数并将其存储在data.frame中。这可以; 它运行没有错误。R控制台中显示的摘要如下所示:

  Group.1  x.Mode x.FALSE x.TRUE x.NA's
1       0 logical       3     83      0
2       5 logical       3     83      0
3      15 logical       8     78      0
4      30 logical       5     79      2
5      60 logical       1     64     21
6     100 logical       1     24     61
Run Code Online (Sandbox Code Playgroud)

同样,这很好,看起来像我想要的。但是包含结果data.frame实际上只有两列,看起来像这样:

    Group.1 x
1   0   logical
2   5   logical
3   15 …
Run Code Online (Sandbox Code Playgroud)

aggregate r dataframe

5
推荐指数
1
解决办法
3011
查看次数

rpy2代码可以并行运行吗?

我有一些Python代码通过rpy2将数据帧传递给R,然后R处理它并将生成的data.frame拉回到R作为PANDAS数据帧com.load_data.

问题是,调用com.load_data在单个Python进程中工作正常,但是当同时在多个multiprocessing.Process进程中运行同一堆代码时它会崩溃.我从Python中得到以下错误消息:

File "C:\\Python27\\lib\\site-packages\\pandas\\rpy\\common.py", line 29, in load_data
    r.data(name) TypeError: 'DataFrame' object is not callable'
Run Code Online (Sandbox Code Playgroud)

所以我的问题是,是不是rpy2实际上设计为能够并行运行,或者是它只是在一个错误load_data的功能?我只是假设每个Python进程都会获得自己独立的R会话.据我所知,唯一的解决方法是让R将输出写入文本文件,相应的Python进程可以打开并继续处理.但这非常笨重.

更新一些代码:

from rpy2.robjects.packages import importr
import rpy2.robjects as ro
import pandas as pd
import pandas.rpy.common as com

# Load C50 library into R environment
C50 = importr('C50')

...

# PANDAS data frame containing test dataset
testing = pd.DataFrame(testing)

# Pass testing dataset to R
rtesting = com.convert_to_r_dataframe(testing)
ro.globalenv['test'] = rtesting

# Strip "AsIs" from …
Run Code Online (Sandbox Code Playgroud)

python r rpy2

1
推荐指数
1
解决办法
1215
查看次数