Sam*_*ufi 10 python fork pickle shared-memory python-multiprocessing
简而言之
我想同时更改复杂的python对象,每个对象只由一个进程处理.我怎么能这样做(效率最高)?实施某种酸洗支持会有帮助吗?这会有效吗?
完整的问题
我有一个python数据结构ArrayDict,基本上由一个numpy数组和一个字典组成,并将任意索引映射到数组中的行.在我的例子中,所有键都是整数.
a = ArrayDict()
a[1234] = 12.5
a[10] = 3
print(a[1234]) #12.5
print(a[10]) # 3.0
print(a[1234] == a.array[a.indexDict[1234]]) #true
Run Code Online (Sandbox Code Playgroud)
现在我有多个这样的ArrayDicts并希望填写它们myMethod(arrayDict, params).由于myMethod价格昂贵,我想并行运行它.请注意,myMethod可能会添加许多行arrayDict.每个过程都会改变自己的过程ArrayDict.我不需要并发访问ArrayDicts.
在myMethod,我更改了条目arrayDict(即,我更改了内部numpy数组),我添加了条目arrayDict(也就是说,我向字典添加另一个索引并在内部数组中写入一个新值).最终,我希望能够在arrayDict内部numpy阵列变得太小时进行交换.这不会经常发生,如果没有更好的解决方案,我可以在程序的非并行部分执行此操作.即使没有阵列交换,我自己的尝试也没有成功.
我花了几天时间研究共享内存和python的多处理模块.由于我最终将在linux上工作,因此任务似乎相当简单:系统调用fork()允许有效地处理参数的副本.我的想法是ArrayDict在自己的进程中更改每个,返回对象的更改版本,并覆盖原始对象.为了节省内存并保存复制工作,我还使用了sharedmem数组来存储数据ArrayDict.我知道字典必须仍然被复制.
from sharedmem import sharedmem
import numpy as np
n = ... # length of the data array
myData = np.empty(n, dtype=object)
myData[:] = [ArrayDict() for _ in range(n)]
done = False
while not done:
consideredData = ... # numpy boolean array of length
# n with True at the index of
# considered data
args = ... # numpy array containing arguments
# for myMethod
with sharedmem.MapReduce() as pool:
results = pool.map(myMethod,
list(zip(myData[considered],
args[considered])),
star=True)
myData[considered] = results
done = ... # depends on what happens in
# myMethod
Run Code Online (Sandbox Code Playgroud)
我得到的是分段错误错误.我能够通过创建ArrayDicts的深度复制myMethod并将其保存到其中来避免此错误myData.我真的不明白为什么这是必要的,并且经常复制我的(可能是非常大的)数组(while循环需要很长时间)对我来说似乎并不高效.但是,至少它在一定程度上起作用.然而,由于共享内存,我的程序在第3次迭代时有一些错误的行为.因此,我认为我的方式不是最优的.
我在这里和这里读到可以使用在共享内存上保存aribtrary numpy数组multiprocessing.Array.但是,我仍然需要分享整个ArrayDict,其中特别包括字典,而字典又不是可选择的.
我怎样才能以有效的方式实现目标?以某种方式使我的对象可以选择是否可能(并且有效)?
所有解决方案必须在64位Linux上运行python 3和完全numpy/scipy支持.
编辑
我在这里发现,使用多处理"管理器"类和用户定义的代理类以某种方式可以共享任意对象.这会有效吗?我想利用我不需要并发访问对象,即使它们没有在主进程中处理.是否可以为我想要处理的每个对象创建一个管理器?(我可能仍然对管理者的工作方式有一些误解.)
这似乎是一个相当复杂的课程,并且我无法完全预期该解决方案是否适合您的情况。对于这样一个复杂的类,一个简单的折衷方法是使用ProcessPoolExecutor。
如果这不能回答您的问题,那么以最少的可行示例为佳。
from concurrent.futures import ProcessPoolExecutor
import numpy as np
class ArrayDict ():
keys = None
vals = None
def __init__ (self):
self.keys = dict ()
self.vals = np.random.rand (1000)
def __str__ (self):
return "keys: " + str(self.keys) + ", vals: " + str(self.vals.mean())
def myMethod (ad, args):
print ("starting:", ad)
if __name__ == '__main__':
l = [ArrayDict() for _ in range (5)]
args = [2, 3, 4, 1, 3]
with ProcessPoolExecutor (max_workers = 2) as ex:
d = ex.map (myMethod, l, args)
Run Code Online (Sandbox Code Playgroud)
当对象被发送到子进程时被克隆,您需要返回结果(因为对对象的更改不会传播回主进程)并处理您要如何存储它们。
请注意,对类变量的更改将传播到同一进程中的其他对象,例如,如果您的任务多于进程,则对在同一进程中运行的实例之间共享对类变量的更改。这通常是不希望的行为。
这是并行化的高级接口。ProcessPoolExecutor使用该multiprocessing模块,并且只能与可拾取对象一起使用。我怀疑其ProcessPoolExecutor性能类似于“进程之间的共享状态”。在底层,ProcessPoolExecutor 正在使用multiprocessing.Process,并且应表现出与相似的性能Pool(除非将非常长的iterable与map一起使用)。ProcessPoolExecutor似乎确实是将来用于python中的并发任务的API。
如果可以,通常使用ThreadPoolExecutor可以更快(可以将替换为ProcessPoolExecutor)。在这种情况下,对象被在进程之间共享,并更新到一个将传播回到主线程。
如前所述,最快的选择可能是重新组织,ArrayDict以便仅使用可以由multiprocessing.Value或表示的对象Array。
如果ProcessPoolExecutor不起作用,并且您无法进行优化ArrayDict,则可能会因使用Manager。这里有很好的例子说明如何做到这一点。
通常可以在中找到最大的性能提升
myMethod。而且,正如我所提到的,使用线程的开销要小于进程的开销。
| 归档时间: |
|
| 查看次数: |
1117 次 |
| 最近记录: |