如何在超时后中止multiprocessing.Pool中的任务?

far*_*awa 14 python multiprocessing python-multiprocessing

我试图以这种方式使用python的多处理包:

featureClass = [[1000,k,1] for k in drange(start,end,step)] #list of arguments
for f in featureClass:
  pool .apply_async(worker, args=f,callback=collectMyResult)
pool.close()
pool.join
Run Code Online (Sandbox Code Playgroud)

从池的进程我想避免等待超过60秒的那些返回其结果.那可能吗?

dan*_*ano 27

这是一种无需更改worker功能即可完成此操作的方法.我们的想法是将worker包装在另一个函数中,该函数将调用worker后台线程,然后等待timeout几秒钟的结果.如果超时到期,则会引发异常,这将突然终止worker正在执行的线程:

import multiprocessing
from multiprocessing.dummy import Pool as ThreadPool
from functools import partial

def worker(x, y, z):
    pass # Do whatever here

def collectMyResult(result):
    print("Got result {}".format(result))

def abortable_worker(func, *args, **kwargs):
    timeout = kwargs.get('timeout', None)
    p = ThreadPool(1)
    res = p.apply_async(func, args=args)
    try:
        out = res.get(timeout)  # Wait timeout seconds for func to complete.
        return out
    except multiprocessing.TimeoutError:
        print("Aborting due to timeout")
        p.terminate()
        raise

if __name__ == "__main__":
    pool = multiprocessing.Pool()
    featureClass = [[1000,k,1] for k in drange(start,end,step)] #list of arguments
    for f in featureClass:
      abortable_func = partial(abortable_worker, worker, timeout=3)
      pool.apply_async(abortable_func, args=f,callback=collectMyResult)
    pool.close()
    pool.join()
Run Code Online (Sandbox Code Playgroud)

超时将提升的任何功能multiprocessing.TimeoutError.请注意,这意味着在发生超时时不会执行回调.如果这是不可接受的,只需更改exceptabortable_worker以返回内容而不是调用raise.

  • 有效吗?我尝试运行上面的示例代码,将worker func更改为`def worker(x, y, z): \n while 1:\n pass`,该过程不会结束。http://stackoverflow.com/a/24634225/3291799 确实有效,但以危险的方式。 (2认同)

thi*_*per 7

我们可以使用 gevent.Timeout 来设置worker运行的时间。gevent教程

from multiprocessing.dummy import Pool 
#you should install gevent.
from gevent import Timeout
from gevent import monkey
monkey.patch_all()
import time

def worker(sleep_time):
    try:

        seconds = 5  # max time the worker may run
        timeout = Timeout(seconds) 
        timeout.start()
        time.sleep(sleep_time)
        print "%s is a early bird"%sleep_time
    except:
        print "%s is late(time out)"%sleep_time

pool = Pool(4)

pool.map(worker, range(10))


output:
0 is a early bird
1 is a early bird
2 is a early bird
3 is a early bird
4 is a early bird
8 is late(time out)
5 is late(time out)
6 is late(time out)
7 is late(time out)
9 is late(time out)
Run Code Online (Sandbox Code Playgroud)