如何将python dict与多处理同步

Pet*_*mit 24 python dictionary multiprocessing

我使用Python 2.6和多处理模块进行多线程处理.现在我想要一个同步的dict(我真正需要的唯一原子操作是值上的+ =运算符).

我应该用multiprocessing.sharedctypes.synchronized()调用来包装dict吗?还是另一种方式去?

man*_*est 55

介绍

似乎有很多扶手椅建议,没有工作实例.这里列出的答案都没有建议使用多处理,这有点令人失望和令人不安.作为python爱好者,我们应该支持我们的内置库,虽然并行处理和同步从来都不是一件小事,但我相信它可以通过适当的设计变得微不足道.这在现代多核架构中变得非常重要,并且不能过分强调!也就是说,我对多处理库很不满意,因为它仍然处于初期阶段,存在很多陷阱,错误,并且面向功能编程(我讨厌).由于多处理在服务器运行时无法共享新创建的对象的严重限制,我目前仍然更喜欢Pyro模块(它超前于时间).管理器对象的"register"类方法只会在管理器(或其服务器)启动之前实际注册一个对象.足够的聊天,更多代码:

Server.py

from multiprocessing.managers import SyncManager


class MyManager(SyncManager):
    pass


syncdict = {}
def get_dict():
    return syncdict

if __name__ == "__main__":
    MyManager.register("syncdict", get_dict)
    manager = MyManager(("127.0.0.1", 5000), authkey="password")
    manager.start()
    raw_input("Press any key to kill server".center(50, "-"))
    manager.shutdown()
Run Code Online (Sandbox Code Playgroud)

在上面的代码示例中,Server.py使用了多处理的SyncManager,它可以提供同步的共享对象.此代码无法在解释器中运行,因为多处理库对于如何为每个已注册对象查找"可调用"非常敏感.运行Server.py将启动一个自定义SyncManager,该SyncManager共享syncdict字典以使用多个进程,并且可以在同一台计算机上连接到客户端,或者如果在环回以外的IP地址上运行,则运行其他计算机.在这种情况下,服务器在端口5000上的环回(127.0.0.1)上运行.使用authkey参数在操作syncdict时使用安全连接.按下任何键时,管理器将关闭.

Client.py

from multiprocessing.managers import SyncManager
import sys, time

class MyManager(SyncManager):
    pass

MyManager.register("syncdict")

if __name__ == "__main__":
    manager = MyManager(("127.0.0.1", 5000), authkey="password")
    manager.connect()
    syncdict = manager.syncdict()

    print "dict = %s" % (dir(syncdict))
    key = raw_input("Enter key to update: ")
    inc = float(raw_input("Enter increment: "))
    sleep = float(raw_input("Enter sleep time (sec): "))

    try:
         #if the key doesn't exist create it
         if not syncdict.has_key(key):
             syncdict.update([(key, 0)])
         #increment key value every sleep seconds
         #then print syncdict
         while True:
              syncdict.update([(key, syncdict.get(key) + inc)])
              time.sleep(sleep)
              print "%s" % (syncdict)
    except KeyboardInterrupt:
         print "Killed client"
Run Code Online (Sandbox Code Playgroud)

客户端还必须创建一个自定义的SyncManager,注册"syncdict",这次没有传入一个callable来检索共享的dict.然后,它使用定制的SycnManager使用端口5000上的环回IP地址(127.0.0.1)进行连接,并使用authkey建立与Server.py中启动的管理器的安全连接.它通过调用管理器上注册的callable来检索共享的dict syncdict.它会提示用户输入以下内容:

  1. 同步操作的关键
  2. 每个周期增加密钥访问值的数量
  3. 每个周期睡眠的时间(以秒为单位)

然后客户端检查密钥是否存在.如果没有,则会在syncdict上创建密钥.然后客户端进入一个"无限"循环,它通过增量更新密钥的值,睡眠指定的数量,并打印syncdict只重复此过程,直到发生KeyboardInterrupt(Ctrl + C).

恼人的问题

  1. 必须在管理器启动之前调用管理器的注册方法,否则即使管理器上的dir调用显示它确实具有已注册的方法,您也将获得异常.
  2. dict的所有操作必须用方法完成而不是dict赋值(syncdict ["blast"] = 2会因为多处理共享自定义对象的方式而失败)
  3. 使用SyncManager的dict方法可以缓解恼人的问题#2,除了恼人的问题#1阻止SyncManager.dict()返回的代理被注册和共享.(SyncManager.dict()只能在管理器启动后调用,并且注册只能在管理器启动之前工作,因此SyncManager.dict()仅在进行函数式编程并将代理作为参数传递给Processes时才有用. doc示例))
  4. 服务器和客户端都必须注册,即使直观地看起来客户端只能在连接到管理器后能够解决它(请将此添加到您的愿望列表多处理开发人员)

闭幕

我希望你能像我一样享受这个非常彻底和稍微耗时的答案.我很难直接理解为什么我在多处理模块中苦苦挣扎,Pyro让它变得轻而易举,而现在由于这个答案,我已经敲了敲头.我希望这对于如何改进多处理模块的python社区是有用的,因为我相信它有很大的希望,但在它的初期缺乏可能性.尽管描述了令人烦恼的问题,但我认为这仍然是一个非常可行的替代方案并且非常简单.您也可以使用SyncManager.dict()并将其作为参数传递给过程,就像文档显示的方式一样,它可能是一个更简单的解决方案,这取决于您的要求,这对我来说感觉不自然.