使用multiprocessing.Managers构建一个简单的远程调度程序

pie*_*ate 5 python multiprocessing

请考虑以下代码:

服务器:

import sys
from multiprocessing.managers import BaseManager, BaseProxy, Process

def baz(aa) :
    l = []
    for i in range(3) :
      l.append(aa)
    return l

class SolverManager(BaseManager): pass

class MyProxy(BaseProxy): pass

manager = SolverManager(address=('127.0.0.1', 50000), authkey='mpm')
manager.register('solver', callable=baz, proxytype=MyProxy)

def serve_forever(server):
    try :
        server.serve_forever()
    except KeyboardInterrupt:
        pass

def runpool(n):
    server = manager.get_server()
    workers = []

    for i in range(int(n)):
        Process(target=serve_forever, args=(server,)).start()

if __name__ == '__main__':
    runpool(sys.argv[1])
Run Code Online (Sandbox Code Playgroud)

客户:

import sys
from multiprocessing.managers import BaseManager, BaseProxy

import multiprocessing, logging

class SolverManager(BaseManager): pass

class MyProxy(BaseProxy): pass

def main(args) :
    SolverManager.register('solver')
    m = SolverManager(address=('127.0.0.1', 50000), authkey='mpm')
    m.connect()

    print m.solver(args[1])._getvalue()

if __name__ == '__main__':
    sys.exit(main(sys.argv))
Run Code Online (Sandbox Code Playgroud)

如果我只使用一个进程来运行服务器 python server.py 1

然后客户端按预期工作.但是如果我产生两个进程(python server.py 2)监听连接,我会得到一个令人讨厌的错误:

$python client.py ping
Traceback (most recent call last):
  File "client.py", line 24, in <module>
sys.exit(main(sys.argv))
  File "client.py", line 21, in main
    print m.solver(args[1])._getvalue()
  File "/usr/lib/python2.6/multiprocessing/managers.py", line 637, in temp
    authkey=self._authkey, exposed=exp
  File "/usr/lib/python2.6/multiprocessing/managers.py", line 894, in AutoProxy
    incref=incref)
  File "/usr/lib/python2.6/multiprocessing/managers.py", line 700, in __init__
    self._incref()
  File "/usr/lib/python2.6/multiprocessing/managers.py", line 750, in _incref
    dispatch(conn, None, 'incref', (self._id,))
  File "/usr/lib/python2.6/multiprocessing/managers.py", line 79, in dispatch
    raise convert_to_error(kind, result)
multiprocessing.managers.RemoteError: 
---------------------------------------------------------------------------
Traceback (most recent call last):
  File "/usr/lib/python2.6/multiprocessing/managers.py", line 181, in handle_request
    result = func(c, *args, **kwds)
  File "/usr/lib/python2.6/multiprocessing/managers.py", line 402, in incref
    self.id_to_refcount[ident] += 1
KeyError: '7fb51084c518'
---------------------------------------------------------------------------
Run Code Online (Sandbox Code Playgroud)

我的想法非常简单.我想创建一个服务器,它将生成许多工作人员,这些工作人员将共享同一个套接字并独立处理请求.也许我在这里使用了错误的工具?

目标是构建一个3层结构,其中所有请求都通过http服务器处理,然后通过多处理管理器分派到位于集群中的节点以及从节点到工作人员......

根据核心数量,每台机器上有一台公共服务器,每台机器一个节点和x个工作人员......我知道我可以使用更复杂的库,但是这样一个简单的任务(我只是在这里进行原型设计) )我只是使用多处理库......这是可能的还是我应该直接探索其他解决方案?我觉得我非常接近在这里工作......谢谢.

Men*_*kov 1


你正在尝试发明一个轮子,许多人以前已经发明过。

在我看来,您正在寻找服务器将任务分派到的任务队列,并且您的工作人员执行该任务。

我建议你看看Celery