ivo*_*ivo 5 python shutdown pool worker multiprocessing
我有一个简单的服务器:
from multiprocessing import Pool, TimeoutError
import time
import os
if __name__ == '__main__':
# start worker processes
pool = Pool(processes=1)
while True:
# evaluate "os.getpid()" asynchronously
res = pool.apply_async(os.getpid, ()) # runs in *only* one process
try:
print(res.get(timeout=1)) # prints the PID of that process
except TimeoutError:
print('worker timed out')
time.sleep(5)
pool.close()
print("Now the pool is closed and no longer available")
pool.join()
print("Done")
Run Code Online (Sandbox Code Playgroud)
如果我运行这个,我会得到类似的东西:
47292
47292
Run Code Online (Sandbox Code Playgroud)
然后我kill 47292
在服务器运行时.启动了新的工作进程,但服务器的输出是:
47292
47292
worker timed out
worker timed out
worker timed out
Run Code Online (Sandbox Code Playgroud)
池仍在尝试向旧工作进程发送请求.
我已经完成了在服务器和工作程序中捕获信号的一些工作,我可以获得稍微好一点的行为,但服务器似乎仍在等待关闭死亡的孩子(即.pool.join()永远不会结束) .
处理工人死亡的正确方法是什么?
如果没有工人死亡,那么从服务器进程中正常关闭工作人员似乎才有效.
(在Python 3.4.4上,但如果有帮助的话,很高兴升级.)
更新:有趣的是,如果使用processes = 2创建池并且您杀死一个工作进程,等待几秒钟并杀死另一个工作进程,则不会发生此工作程序超时问题.但是,如果您快速连续杀死两个工作进程,则"工作超时"问题会再次出现.
也许相关的是,当问题发生时,终止服务器进程将使工作进程保持运行.
这种行为来自于 的设计multiprocessing.Pool
。当你杀死一名工人时,你可能会杀死持有call_queue.rlock
. 当这个进程在持有锁的情况下被杀死时,其他进程将无法再读入call_queue
,从而破坏了,Pool
因为它无法再与其工作线程通信。
所以实际上没有办法杀死一个工人并确保你Pool
之后仍然没事,因为你可能最终陷入僵局。
multiprocessing.Pool
不处理工人的死亡。您可以尝试使用concurrent.futures.ProcessPoolExecutor
(使用稍微不同的 API)来替代,它默认处理进程的失败。当进程在 中终止时ProcessPoolExecutor
,整个执行器将关闭,并且您会返回一个BrokenProcessPool
错误。
请注意,此实现中还存在其他死锁,应在loky
. (免责声明:我是这个库的维护者)。另外,您还可以使用 a和 方法loky
调整现有的大小。如果您有兴趣,请告诉我,我可以从这个包开始为您提供一些帮助。(我意识到我们仍然需要在文档上做一些工作...0_0)executor
ReusablePoolExecutor
_resize
归档时间: |
|
查看次数: |
2265 次 |
最近记录: |