分别在并行进程中更改不同的python对象

Sam*_*ufi 10 python fork pickle shared-memory python-multiprocessing

简而言之

我想同时更改复杂的python对象,每个对象只由一个进程处理.我怎么能这样做(效率最高)?实施某种酸洗支持会有帮助吗?这会有效吗?

完整的问题

我有一个python数据结构ArrayDict,基本上由一个numpy数组和一个字典组成,并将任意索引映射到数组中的行.在我的例子中,所有键都是整数.

a = ArrayDict()

a[1234] = 12.5
a[10] = 3

print(a[1234])                               #12.5
print(a[10])                                 # 3.0

print(a[1234] == a.array[a.indexDict[1234]]) #true
Run Code Online (Sandbox Code Playgroud)

现在我有多个这样的ArrayDicts并希望填写它们myMethod(arrayDict, params).由于myMethod价格昂贵,我想并行运行它.请注意,myMethod可能会添加许多行arrayDict.每个过程都会改变自己的过程ArrayDict.我不需要并发访问ArrayDicts.

myMethod,我更改了条目arrayDict(即,我更改了内部numpy数组),我添加了条目arrayDict(也就是说,我向字典添加另一个索引并在内部数组中写入一个新值).最终,我希望能够在arrayDict内部numpy阵列变得太小时进行交换.这不会经常发生,如果没有更好的解决方案,我可以在程序的非并行部分执行此操作.即使没有阵列交换,我自己的尝试也没有成功.

我花了几天时间研究共享内存和python的多处理模块.由于我最终将在linux上工作,因此任务似乎相当简单:系统调用fork()允许有效地处理参数的副本.我的想法是ArrayDict在自己的进程中更改每个,返回对象的更改版本,并覆盖原始对象.为了节省内存并保存复制工作,我还使用了sharedmem数组来存储数据ArrayDict.我知道字典必须仍然被复制.

from sharedmem import sharedmem
import numpy as np

n = ...                   # length of the data array
myData = np.empty(n, dtype=object)
myData[:] = [ArrayDict() for _ in range(n)]
done = False

while not done:
    consideredData = ...  # numpy boolean array of length
                          # n with True at the index of
                          # considered data
    args = ...            # numpy array containing arguments
                          # for myMethod

    with sharedmem.MapReduce() as pool:
        results = pool.map(myMethod, 
                           list(zip(myData[considered], 
                                    args[considered])),
                           star=True)
        myData[considered] = results

    done = ...             # depends on what happens in
                           # myMethod
Run Code Online (Sandbox Code Playgroud)

我得到的是分段错误错误.我能够通过创建ArrayDicts的深度复制myMethod并将其保存到其中来避免此错误myData.我真的不明白为什么这是必要的,并且经常复制我的(可能是非常大的)数组(while循环需要很长时间)对我来说似乎并不高效.但是,至少它在一定程度上起作用.然而,由于共享内存,我的程序在第3次迭代时有一些错误的行为.因此,我认为我的方式不是最优的.

在这里这里读到可以使用在共享内存上保存aribtrary numpy数组multiprocessing.Array.但是,我仍然需要分享整个ArrayDict,其中特别包括字典,而字典又不是可选择的.

我怎样才能以有效的方式实现目标?以某种方式使我的对象可以选择是否可能(并且有效)?

所有解决方案必须在64位Linux上运行python 3和完全numpy/scipy支持.

编辑

我在这里发现,使用多处理"管理器"类和用户定义的代理类以某种方式可以共享任意对象.这会有效吗?我想利用我不需要并发访问对象,即使它们没有在主进程中处理.是否可以为我想要处理的每个对象创建一个管理器?(我可能仍然对管理者的工作方式有一些误解.)

gau*_*teh 6

这似乎是一个相当复杂的课程,并且我无法完全预期该解决方案是否适合您的情况。对于这样一个复杂的类,一个简单的折衷方法是使用ProcessPoolExecutor

如果这不能回答您的问题,那么以最少的可行示例为佳。

from concurrent.futures import ProcessPoolExecutor

import numpy as np

class ArrayDict ():
  keys = None
  vals = None

  def __init__ (self):
    self.keys = dict ()
    self.vals = np.random.rand (1000)

  def __str__ (self):
    return "keys: " + str(self.keys) + ", vals: " + str(self.vals.mean())

def myMethod (ad, args):
  print ("starting:", ad)


if __name__ == '__main__':
  l     = [ArrayDict() for _ in range (5)]
  args  = [2, 3, 4, 1, 3]

  with ProcessPoolExecutor (max_workers = 2) as ex:

    d = ex.map (myMethod, l, args)
Run Code Online (Sandbox Code Playgroud)

当对象被发送到子进程时被克隆,您需要返回结果(因为对对象的更改不会传播回主进程)并处理您要如何存储它们。

请注意,对类变量的更改将传播到同一进程中的其他对象,例如,如果您的任务多于进程,则对在同一进程中运行的实例之间共享对类变量的更改。这通常是不希望的行为。

这是并行化的高级接口。ProcessPoolExecutor使用该multiprocessing模块,并且只能与可拾取对象一起使用。我怀疑其ProcessPoolExecutor性能类似于“进程之间的共享状态”。在底层,ProcessPoolExecutor 正在使用multiprocessing.Process,并且应表现出与相似的性能Pool(除非将非常长的iterable与map一起使用)。ProcessPoolExecutor似乎确实是将来用于python中的并发任务的API。

如果可以,通常使用ThreadPoolExecutor可以更快(可以将替换为ProcessPoolExecutor)。在这种情况下,对象在进程之间共享,并更新到一个将传播回到主线程。

如前所述,最快的选择可能是重新组织,ArrayDict以便仅使用可以由multiprocessing.Value或表示的对象Array

如果ProcessPoolExecutor不起作用,并且您无法进行优化ArrayDict,则可能会因使用Manager。这里有很好的例子说明如何做到这一点

通常可以在中找到最大的性能提升myMethod。而且,正如我所提到的,使用线程的开销要小于进程的开销。