使用Python多处理管理器(BaseManager/SyncManager)与远程计算机共享队列时断管

Dav*_*vid 14 python pipe multiprocessing

在上个月,当我们尝试使用它来共享几个不同(Linux)计算机之间的队列时,我们遇到了Python 2.6.x多处理程序包的持续问题.我已经直接向Jesse Noller提出了这个问题,因为我们还没有发现任何可以解释StackOverflow,Python文档,源代码或其他在线问题的内容.

我们的工程师团队无法解决这个问题,我们已经向python用户组中的很多人提出了这个问题,但无济于事.我希望有人可以提供一些见解,因为我觉得我们做的事情不正确但是太接近问题而不能看到它是什么.

这是症状:

Traceback (most recent call last):
  File "/var/django_root/dev/com/brightscope/data/processes/daemons/deferredupdates/servers/queue_server.py", line 65, in get_from_queue
    return queue, queue.get(block=False)
  File "<string>", line 2, in get
  File "/usr/local/lib/python2.6/multiprocessing/managers.py", line 725, in _callmethod
    conn.send((self._id, methodname, args, kwds))
IOError: [Errno 32] Broken pipe
Run Code Online (Sandbox Code Playgroud)

(我正在显示我们的代码在共享队列对象上调用queue.get()的位置,由扩展SyncManger的管理器托管).

这个问题的特殊之处在于,如果我们在一台机器上连接到这个共享队列(让我们称之为machine A),即使是来自大量并发进程,我们似乎也不会遇到问题.只有当我们连接到队列(同样,使用扩展多处理SyncManager并且当前不添加其他功能的类)时,才能从其他计算机(让我们调用这些machines B and C)并同时运行大量项目进出队列我们遇到了问题.

就好像python的多处理包处理本地连接(即使它们仍然使用相同的manager.connect()连接方法)一样,machine A但是当从至少一个远程连接同时进行远程连接时,machines B or C我们得到一个Broken管道错误.

在我的团队所做的所有阅读中,我们认为问题与锁定有关.我们认为也许我们不应该使用Queue.Queue,而是multiprocessing.Queue,但我们切换并且问题持续存在(我们还注意到SyncManager自己的共享队列是Queue.Queue的一个实例).

我们正在研究如何调试问题,因为它很难重现,但确实经常发生(如果我们从队列中插入和.get()大量项目,则每天多次).

我们创建的方法get_from_queue尝试使用随机休眠间隔重试从队列中获取项目~10次,但似乎如果它失败一次,它将失败十次(这使我相信.register()和.connect ()管理员可能没有给服务器另一个套接字连接,但我无法通过阅读文档或查看Python内部源代码来确认这一点.

任何人都可以提供任何洞察我们可能会在哪里或我们如何跟踪实际发生的事情?

如果管道损坏,我们如何使用multiprocessing.BaseManager或启动新连接multiprocessing.SyncManager

我们怎样才能首先防止破裂的管道?

Dav*_*vid 9

仅供参考.如果其他人遇到同样的错误,在与Python的核心开发团队的Ask Solem和Jesse Noller进行了广泛的咨询后,看起来这实际上是当前python 2.6.x中的一个错误(可能是2.7+,可能是3.x) ).他们正在研究可能的解决方案,并且可能会在未来的Python版本中包含修复程序.

  • 是否有针对此的Python错误报告? (19认同)
  • 这曾经被修复过吗?我在 Python 3.8.0 中看到了类似的问题。 (4认同)

pan*_*kaj 7

我遇到了同样的问题,即使在python 2.7.1中连接localhost也是如此.经过一天的调试后,我找到了原因和解决方法:

原因:BaseProxy类具有缓存连接的线程本地存储,该连接将重复用于将来的连接,即使在创建新管理器时也会导致"管道损坏"错误

解决方法:在重新连接之前删除缓存的连接

from multiprocessing.managers import BaseProxy

...

if address in BaseProxy._address_to_local:
    del BaseProxy._address_to_local[address][0].connection
Run Code Online (Sandbox Code Playgroud)