python multiprocessing.Pool kill*specific*long running或hung进程

dra*_*jub 7 python timeout pool process multiprocessing

我需要执行许多并行数据库连接和查询的池.我想使用multiprocessing.Pool或concurrent.futures ProcessPoolExecutor.Python 2.7.5

在某些情况下,查询请求需要太长时间或永远不会完成(挂起/僵尸进程).我想从已经超时的multiprocessing.Pool或concurrent.futures ProcessPoolExecutor中删除特定进程.

下面是一个如何杀死/重新生成整个进程池的示例,但理想情况下我会尽量减少CPU抖动,因为我只想杀死一个特定的长时间运行的进程,该进程在超时秒后没有返回数据.

出于某种原因,在返回并完成所有结果后,下面的代码似乎无法终止/加入进程池.它可能与发生超时时杀死工作进程有关,但是当Pool被杀死并且结果符合预期时,Pool会创建新工作程序.

from multiprocessing import Pool
import time
import numpy as np
from threading import Timer
import thread, time, sys

def f(x):
    time.sleep(x)
    return x

if __name__ == '__main__':
    pool = Pool(processes=4, maxtasksperchild=4)

    results = [(x, pool.apply_async(f, (x,))) for x in np.random.randint(10, size=10).tolist()]

    while results:
        try:
            x, result = results.pop(0)
            start = time.time()
            print result.get(timeout=5), '%d done in %f Seconds!' % (x, time.time()-start)

        except Exception as e:
            print str(e)
            print '%d Timeout Exception! in %f' % (x, time.time()-start)
            for p in pool._pool:
                if p.exitcode is None:
                    p.terminate()

    pool.terminate()
    pool.join()
Run Code Online (Sandbox Code Playgroud)

sta*_*sia 5

我不完全理解你的问题。您说您想停止一个特定的进程,但随后,在异常处理阶段,您对所有作业调用终止。不知道你为什么要这样做。另外,我很确定使用内部变量multiprocessing.Pool不太安全。说了这么多,我想你的问题是为什么这个程序在超时发生时没有完成。如果这是问题所在,那么以下方法可以解决问题:

from multiprocessing import Pool
import time
import numpy as np
from threading import Timer
import thread, time, sys

def f(x):
    time.sleep(x)
    return x

if __name__ == '__main__':
    pool = Pool(processes=4, maxtasksperchild=4)

    results = [(x, pool.apply_async(f, (x,))) for x in np.random.randint(10, size=10).tolist()]

    result = None
    start = time.time()
    while results:
        try:
            x, result = results.pop(0)
            print result.get(timeout=5), '%d done in %f Seconds!' % (x, time.time()-start)
        except Exception as e:
            print str(e)
            print '%d Timeout Exception! in %f' % (x, time.time()-start)
            for i in reversed(range(len(pool._pool))):
                p = pool._pool[i]
                if p.exitcode is None:
                    p.terminate()
                del pool._pool[i]

    pool.terminate()
    pool.join()
Run Code Online (Sandbox Code Playgroud)

关键是你需要从池中删除项目;仅仅对它们调用终止是不够的。