多处理策略 - 共享嵌套对象

Flo*_*Lie 6 python simulation multiprocessing

我正在从事一个研究项目,并希望应用并行化来提高执行速度。我之前曾与multiprocessing图书馆合作过,但仅用于数字运算。我将尝试简要描述我的设置和目标。我主要希望来自对多处理概念更有经验的人的想法。

该项目:

该项目是一个多回声供应链模拟(一个多级分销网络),其中根据传入的需求在每个位置定期做出重新订购决策。一个玩具示例如下所示:

  Level 3               Level 2                 Level 1             Level 0

                                         --- Local Warehouse 1
                                        |
             --- Central Warehouse 1 --
            |                           |
            |                            --- Local Warehouse 2
            |
Supplier --                                                        Customer
            |                            --- Local Warehouse 3
            |                           |
             --- Central Warehouse 2 --
                                        |
                                         --- Local Warehouse 4
Run Code Online (Sandbox Code Playgroud)

模拟对象(简化)如下:

class Simulation:
  self.locations = dict() #List of locations
  self.customer = Customer() #Object periodically ordering at deepest level (Local) Warehouses
  self.levels = {0: [], 1:[],..} # Locations by depth in network graph
  def run(self):
    for period in simulation_length:
      for level in self.levels:
        for location in level:
          #review orders and issue order if required

class Location:
  self.orders = [] #list of received orders
  def review(self):
     #Decides based on received orders if reorder required
  def order(self, order, other_location):
       simulation.locations[other_location].orders.append(order)
Run Code Online (Sandbox Code Playgroud)

因此,该过程如下所示:

  1. 客户(级别 0)向本地仓库(级别 1)发出订单
  2. 本地仓库(级别 1)审核订单并向中央仓库(级别 2)发出订单
  3. 依此类推,直到供应商
  4. 下一期

我的问题/想法

现在我有一个dict属于供应链特定级别的所有仓库,并且我按顺序遍历每个级别中的每个仓库(因此满足依赖关系)每个时期。关卡数量是有限制的,但是每个关卡的仓库数量很大,而且审核逻辑可能是计算密集型的,因此我的计划是并行审核属于同一级别的所有仓库

但是,由于位置使用order(self, order, other_location)访问模拟对象内另一个对象的属性的函数,我需要在进程之间共享整个模拟对象

思路和方法:

  1. 放置sumulation objectinshared memoryLock在对象上使用 a ,无论何时下订单(评论中的所有其他操作都是纯粹的读取操作)
  2. 不是直接下订单,而是将它们放在Queue主流程中,然后在一个级别内的所有仓库返回后,只需执行订单功能(计算成本低)

(1) 的问题:

从我所有的研究来看,只有CType对象ValueArray可以放在共享内存中。我无法弄清楚如何。我唯一读到的是multiprocessing Manager,但另一个计算器问题链接说,它不适用于嵌套对象。

(2) 的问题:

由于每个仓库对象在期间之间发生变化(订单到达,库存变化,..)我必须将仓库对象移交给每个期间的流程,以使其保持最新状态,这会产生大量开销(至少我认为是这样)

结论

我希望它清楚我想要实现的目标。任何对我这边误解的暗示、澄清或纠正都会很棒!

编辑@Roy12 的回答:

谢谢你的回答。我一定会看看 Dask,因为最终目标是利用集群。关于第一个提示,我想到了两个实现,我很感激你的建议:我的位置需要接收和发送订单对象,发送部分由对象本身控制,接收不是。因此对我来说选项 1 是

  1. 在一个周期开始时,带有最新位置对象的 spawn 进程会进行计算,而不是直接发送订单,而是将它们放入队列中并关闭进程。当整个级别完成后,主进程会分配订单并为下一个级别生成进程,依此类推。这导致定期生成和关闭过程,并且根据模拟长度,位置对象变得相当大

  2. 我在开始时静态地将位置映射到进程,并有一个传入队列和一个传出队列,并让主进程执行订单的分发,例如进程 1(位置 1)向进程 2(位置 2)发送订单是 -> 进程1 -> Main Process -> Process 2. 在这种情况下,每次处理订单时都需要给进程一个信号并执行例程(读取队列 -> 重新计算 -> 将订单发送到队列)

(2)对我来说似乎更复杂,但我对缺点没有感觉,否则最终收集必须被编程。如果它很重要,订单对象的大小约为 40 字节,位置对象(仓库)在整个运行过程中增长到约 15 mb

Roy*_*012 0

一个很好的用例。一些想法/建议:

  • 不要使用共享内存。如今这被认为是不好的做法。人们过去常常使用共享内存来实现并发,但现代方法是尽可能避免这种情况。例如,Go 语言提供了一些不错的替代方案(请参阅https://blog.golang.org/codelab-share)。共享内存的另一个缺点是您无法将工作分配到多台计算机上。
  • 使用队列通常要好得多。如果您要在进程之间移回的数据并不大(很多(很多)兆字节),则开销可以忽略不计。
  • 对于您的用例,您可能需要考虑使用分布式计算框架,例如Dask。它提供了简单的方法来收集子任务的结果,然后才开始处理层次结构中的下一个级别。此外,它允许您将工作分布到整个集群,而不仅仅是一台机器。

希望这可以帮助。

更新以下一些规模数据:

该问题指出,一个位置的大小为 15MB,一个订单的大小约为 40 字节(明显更小)。

鉴于此,很明显,如果我们针对低网络流量进行优化,我们将采用模型#1,其中每个位置都是一个贯穿整个模拟的进程,并与其他位置视图队列和消息进行通信。

但是 - 这是一个很大的但是 - 通过队列运行所有通信似乎是一个更复杂的实现。创建一个包含 15MB 数据的进程应该需要不到一秒的时间。如果每个位置的计算都非常重要,那么它可能需要比进程创建本身更多的时间。因此,我可能会从更简单的实现开始(为每个位置生成一个新流程)。

换句话说,围绕队列构建整个系统似乎有些不成熟的优化。

最后一点:有一个名为SimPy的 Python 模拟包。我不知道它的可扩展性如何,但它可能值得一看。