相关疑难解决方法(0)

多处理中的共享内存对象

假设我有一个大内存numpy数组,我有一个函数func,它接受这个巨大的数组作为输入(连同一些其他参数).func具有不同参数可以并行运行.例如:

def func(arr, param):
    # do stuff to arr, param

# build array arr

pool = Pool(processes = 6)
results = [pool.apply_async(func, [arr, param]) for param in all_params]
output = [res.get() for res in results]
Run Code Online (Sandbox Code Playgroud)

如果我使用多处理库,那么这个巨型数组将被多次复制到不同的进程中.

有没有办法让不同的进程共享同一个数组?此数组对象是只读的,永远不会被修改.

更复杂的是,如果arr不是一个数组,而是一个任意的python对象,有没有办法分享它?

[EDITED]

我读了答案,但我仍然有点困惑.由于fork()是copy-on-write,因此在python多处理库中生成新进程时不应调用任何额外的成本.但是下面的代码表明存在巨大的开销:

from multiprocessing import Pool, Manager
import numpy as np; 
import time

def f(arr):
    return len(arr)

t = time.time()
arr = np.arange(10000000)
print "construct array = ", time.time() - t;


pool = Pool(processes = 6)

t = …
Run Code Online (Sandbox Code Playgroud)

python parallel-processing numpy shared-memory multiprocessing

112
推荐指数
4
解决办法
10万
查看次数

在共享内存中使用numpy数组进行多处理

我想在共享内存中使用numpy数组与多处理模块一起使用.困难是使用它像一个numpy数组,而不仅仅是一个ctypes数组.

from multiprocessing import Process, Array
import scipy

def f(a):
    a[0] = -a[0]

if __name__ == '__main__':
    # Create the array
    N = int(10)
    unshared_arr = scipy.rand(N)
    arr = Array('d', unshared_arr)
    print "Originally, the first two elements of arr = %s"%(arr[:2])

    # Create, start, and finish the child processes
    p = Process(target=f, args=(arr,))
    p.start()
    p.join()

    # Printing out the changed values
    print "Now, the first two elements of arr = %s"%arr[:2]
Run Code Online (Sandbox Code Playgroud)

这会产生如下输出:

Originally, the first two elements of arr = …
Run Code Online (Sandbox Code Playgroud)

python shared numpy multiprocessing

95
推荐指数
6
解决办法
6万
查看次数

可以在两个独立的进程之间共享内存数据吗?

我有一个使用Twisted的xmlrpc服务器.服务器有大量的数据存储在内存中.是否可以运行一个辅助的单独的xmlrpc服务器,它可以访问第一个服务器中的内存中的对象?

因此,serverA启动并创建一个对象.serverB启动并可以从serverA中的对象读取.

*编辑*

要共享的数据是一百万个元组的列表.

python

59
推荐指数
4
解决办法
5万
查看次数

如何在python子进程之间传递大型numpy数组而不保存到磁盘?

有没有一种很好的方法可以在不使用磁盘的情况下在两个python子进程之间传递大量数据?这是我希望完成的动画示例:

import sys, subprocess, numpy

cmdString = """
import sys, numpy

done = False
while not done:
    cmd = raw_input()
    if cmd == 'done':
        done = True
    elif cmd == 'data':
        ##Fake data. In real life, get data from hardware.
        data = numpy.zeros(1000000, dtype=numpy.uint8)
        data.dump('data.pkl')
        sys.stdout.write('data.pkl' + '\\n')
        sys.stdout.flush()"""

proc = subprocess.Popen( #python vs. pythonw on Windows?
    [sys.executable, '-c %s'%cmdString],
    stdin=subprocess.PIPE,
    stdout=subprocess.PIPE,
    stderr=subprocess.PIPE)

for i in range(3):
    proc.stdin.write('data\n')
    print proc.stdout.readline().rstrip()
    a = numpy.load('data.pkl')
    print a.shape

proc.stdin.write('done\n')
Run Code Online (Sandbox Code Playgroud)

这将创建一个子进程,该子进程生成numpy数组并将数组保存到磁盘.然后父进程从磁盘加载数组.有用!

问题是,我们的硬件可以生成比磁盘可读/写快10倍的数据.有没有办法将数据从一个python进程传输到另一个纯内存中,甚至可能没有复制数据?我可以做一些像传递参考的东西吗?

我第一次尝试纯粹在内存中传输数据是非常糟糕的:

import …
Run Code Online (Sandbox Code Playgroud)

python ctypes subprocess numpy pass-by-reference

26
推荐指数
3
解决办法
1万
查看次数

numpy与多处理和mmap

我正在使用Python的multiprocessing模块并行处理大型numpy数组.numpy.load(mmap_mode='r')在主进程中使用内存映射数组.在那之后,multiprocessing.Pool()分叉过程(我推测).

一切似乎都很好,除了我得到的行:

AttributeError("'NoneType' object has no attribute 'tell'",)
  in `<bound method memmap.__del__ of
       memmap([ 0.57735026,  0.57735026,  0.57735026,  0.        ,  0.        ,        0.        ,  0.        ,  0.        ,  0.        ,  0.        ,        0.        ,  0.        ], dtype=float32)>`
     ignored
Run Code Online (Sandbox Code Playgroud)

在unittest日志中.尽管如此,测试仍然没有通过.

知道那里发生了什么吗?

使用Python 2.7.2,OS X,NumPy 1.6.1.


更新:

经过一些调试后,我把原因归结为一个代码路径,该代码路径使用这个内存映射的numpy数组(一小部分)作为Pool.imap调用的输入.

显然,"问题"是multiprocessing.Pool.imap通过将输入传递给新进程的方式:它使用pickle.这不适用于mmaped numpy数组,而内部的某些内容会导致错误.

我发现Robert Kern的回复似乎解决了同样的问题.他建议为imap输入来自内存映射数组时创建一个特殊的代码路径:在生成的进程中手动映射同一个数组.

这将是如此复杂和丑陋,我宁愿忍受错误和额外的内存副本.有没有其他方法可以更轻松地修改现有代码?

python mmap numpy multiprocessing

20
推荐指数
1
解决办法
6875
查看次数

分别在并行进程中更改不同的python对象

简而言之

我想同时更改复杂的python对象,每个对象只由一个进程处理.我怎么能这样做(效率最高)?实施某种酸洗支持会有帮助吗?这会有效吗?

完整的问题

我有一个python数据结构ArrayDict,基本上由一个numpy数组和一个字典组成,并将任意索引映射到数组中的行.在我的例子中,所有键都是整数.

a = ArrayDict()

a[1234] = 12.5
a[10] = 3

print(a[1234])                               #12.5
print(a[10])                                 # 3.0

print(a[1234] == a.array[a.indexDict[1234]]) #true
Run Code Online (Sandbox Code Playgroud)

现在我有多个这样的ArrayDicts并希望填写它们myMethod(arrayDict, params).由于myMethod价格昂贵,我想并行运行它.请注意,myMethod可能会添加许多行arrayDict.每个过程都会改变自己的过程ArrayDict.我不需要并发访问ArrayDicts.

myMethod,我更改了条目arrayDict(即,我更改了内部numpy数组),我添加了条目arrayDict(也就是说,我向字典添加另一个索引并在内部数组中写入一个新值).最终,我希望能够在arrayDict内部numpy阵列变得太小时进行交换.这不会经常发生,如果没有更好的解决方案,我可以在程序的非并行部分执行此操作.即使没有阵列交换,我自己的尝试也没有成功.

我花了几天时间研究共享内存和python的多处理模块.由于我最终将在linux上工作,因此任务似乎相当简单:系统调用fork()允许有效地处理参数的副本.我的想法是ArrayDict在自己的进程中更改每个,返回对象的更改版本,并覆盖原始对象.为了节省内存并保存复制工作,我还使用了sharedmem数组来存储数据ArrayDict.我知道字典必须仍然被复制.

from sharedmem import sharedmem
import numpy as np

n = ...                   # length of …
Run Code Online (Sandbox Code Playgroud)

python fork pickle shared-memory python-multiprocessing

10
推荐指数
1
解决办法
1117
查看次数

多处理在python中写入数组的函数循环

我正在尝试为此循环实现多处理.它无法修改数组,或者似乎没有正确排序作业(在最后一个函数完成之前返回数组).

import multiprocessing
import numpy


def func(i, array):
    array[i] = i**2
    print(i**2)

def main(n):
    array = numpy.zeros(n)

    if __name__ == '__main__':
        jobs = []
        for i in range(0, n):
            p = multiprocessing.Process(target=func, args=(i, array))
            jobs.append(p)
            p.start()

    return array

print(main(10))
Run Code Online (Sandbox Code Playgroud)

python arrays loops multiprocessing

3
推荐指数
1
解决办法
5891
查看次数