Tei*_*ion 259 python multithreading timeout python-multithreading
我正在调用Python中的一个函数,我知道它可能会停止并迫使我重新启动脚本.
如何调用该函数或我将其包装成什么,以便如果它花费的时间超过5秒,脚本会取消它并执行其他操作?
pir*_*iro 204
如果在UNIX上运行,则可以使用信号包:
In [1]: import signal
# Register an handler for the timeout
In [2]: def handler(signum, frame):
   ...:     print "Forever is over!"
   ...:     raise Exception("end of time")
   ...: 
# This function *may* run for an indetermined time...
In [3]: def loop_forever():
   ...:     import time
   ...:     while 1:
   ...:         print "sec"
   ...:         time.sleep(1)
   ...:         
   ...:         
# Register the signal function handler
In [4]: signal.signal(signal.SIGALRM, handler)
Out[4]: 0
# Define a timeout for your function
In [5]: signal.alarm(10)
Out[5]: 0
In [6]: try:
   ...:     loop_forever()
   ...: except Exception, exc: 
   ...:     print exc
   ....: 
sec
sec
sec
sec
sec
sec
sec
sec
Forever is over!
end of time
# Cancel the timer if the function returned before timeout
# (ok, mine won't but yours maybe will :)
In [7]: signal.alarm(0)
Out[7]: 0
调用10秒后,调用alarm.alarm(10)处理程序.这引发了一个例外,您可以从常规Python代码中截取.
这个模块不适合线程(但那么,谁呢?)
请注意,由于我们在超时发生时引发异常,因此它可能最终被捕获并在函数内被忽略,例如一个这样的函数:
def loop_forever():
    while 1:
        print 'sec'
        try:
            time.sleep(10)
        except:
            continue
ATO*_*TOA 136
您可以使用multiprocessing.Process这样做.
码
import multiprocessing
import time
# bar
def bar():
    for i in range(100):
        print "Tick"
        time.sleep(1)
if __name__ == '__main__':
    # Start bar as a process
    p = multiprocessing.Process(target=bar)
    p.start()
    # Wait for 10 seconds or until process finishes
    p.join(10)
    # If thread is still active
    if p.is_alive():
        print "running... let's kill it..."
        # Terminate
        p.terminate()
        p.join()
Aar*_*all 70
如何调用该函数或将其包装在哪里,以便如果脚本取消时间超过5秒?
我发布了一个要点,用装饰器来解决这个问题/问题threading.Timer.这是故障.
它使用Python 2和3进行了测试.它也应该在Unix/Linux和Windows下运行.
首先是进口.无论Python版本如何,这些都试图保持代码一致:
from __future__ import print_function
import sys
import threading
from time import sleep
try:
    import thread
except ImportError:
    import _thread as thread
使用版本无关的代码:
try:
    range, _print = xrange, print
    def print(*args, **kwargs): 
        flush = kwargs.pop('flush', False)
        _print(*args, **kwargs)
        if flush:
            kwargs.get('file', sys.stdout).flush()            
except NameError:
    pass
现在我们从标准库中导入了我们的功能.
exit_after 装饰接下来我们需要一个函数来终止main()子线程:
def quit_function(fn_name):
    # print to stderr, unbuffered in Python 2.
    print('{0} took too long'.format(fn_name), file=sys.stderr)
    sys.stderr.flush() # Python 3 stderr is likely buffered.
    thread.interrupt_main() # raises KeyboardInterrupt
这是装饰者本身:
def exit_after(s):
    '''
    use as decorator to exit process if 
    function takes longer than s seconds
    '''
    def outer(fn):
        def inner(*args, **kwargs):
            timer = threading.Timer(s, quit_function, args=[fn.__name__])
            timer.start()
            try:
                result = fn(*args, **kwargs)
            finally:
                timer.cancel()
            return result
        return inner
    return outer
这里的用法直接回答了你关于5秒后退出的问题!:
@exit_after(5)
def countdown(n):
    print('countdown started', flush=True)
    for i in range(n, -1, -1):
        print(i, end=', ', flush=True)
        sleep(1)
    print('countdown finished')
演示:
>>> countdown(3)
countdown started
3, 2, 1, 0, countdown finished
>>> countdown(10)
countdown started
10, 9, 8, 7, 6, countdown took too long
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "<stdin>", line 11, in inner
  File "<stdin>", line 6, in countdown
KeyboardInterrupt
第二个函数调用不会完成,而是进程应该以traceback退出!
KeyboardInterrupt 并不总是停止睡眠线程请注意,Windows上的Python 2上的键盘中断并不总是会中断睡眠,例如:
@exit_after(1)
def sleep10():
    sleep(10)
    print('slept 10 seconds')
>>> sleep10()
sleep10 took too long         # Note that it hangs here about 9 more seconds
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "<stdin>", line 11, in inner
  File "<stdin>", line 3, in sleep10
KeyboardInterrupt
除非明确检查PyErr_CheckSignals(),否则它是否可能会中断在扩展中运行的代码,请参阅   Cython,Python和KeyboardInterrupt忽略
在任何情况下,我都会避免睡一个线程超过一秒 - 这是处理器时间的一个因素.
如何调用该函数或我将其包装成什么,以便如果它花费的时间超过5秒,脚本会取消它并执行其他操作?
要捕获它并执行其他操作,您可以捕获KeyboardInterrupt.
>>> try:
...     countdown(10)
... except KeyboardInterrupt:
...     print('do something else')
... 
countdown started
10, 9, 8, 7, 6, countdown took too long
do something else
Ale*_*lex 48
我有一个不同的提议,它是一个纯函数(使用与线程建议相同的API)并且似乎工作正常(基于此线程的建议)
def timeout(func, args=(), kwargs={}, timeout_duration=1, default=None):
    import signal
    class TimeoutError(Exception):
        pass
    def handler(signum, frame):
        raise TimeoutError()
    # set the timeout handler
    signal.signal(signal.SIGALRM, handler) 
    signal.alarm(timeout_duration)
    try:
        result = func(*args, **kwargs)
    except TimeoutError as exc:
        result = default
    finally:
        signal.alarm(0)
    return result
Ric*_*ich 29
在单元测试中搜索超时调用时,我遇到了这个线程.我没有在答案或第三方包中找到任何简单的内容,所以我在下面编写了装饰器,你可以直接进入代码:
import multiprocessing.pool
import functools
def timeout(max_timeout):
    """Timeout decorator, parameter in seconds."""
    def timeout_decorator(item):
        """Wrap the original function."""
        @functools.wraps(item)
        def func_wrapper(*args, **kwargs):
            """Closure for function."""
            pool = multiprocessing.pool.ThreadPool(processes=1)
            async_result = pool.apply_async(item, args, kwargs)
            # raises a TimeoutError if execution exceeds max_timeout
            return async_result.get(max_timeout)
        return func_wrapper
    return timeout_decorator
然后就这么简单来超时测试或任何你喜欢的功能:
@timeout(5.0)  # if execution takes longer than 5 seconds, raise a TimeoutError
def test_base_regression(self):
    ...
Bri*_*ian 16
有很多建议,但没有使用concurrent.futures,我认为这是最清晰的处理方式.
from concurrent.futures import ProcessPoolExecutor
# Warning: this does not terminate function if timeout
def timeout_five(fnc, *args, **kwargs):
    with ProcessPoolExecutor() as p:
        f = p.submit(fnc, *args, **kwargs)
        return f.result(timeout=5)
超级简单的阅读和维护.
我们创建一个池,提交一个进程,然后等待最多5秒钟,然后提出一个TimeoutError,你可以捕获并处理你需要的东西.
原生于python 3.2+并向后移植到2.7(pip install期货).
线程和进程之间的切换是在更换为简单ProcessPoolExecutor用ThreadPoolExecutor.
如果你想在超时时终止进程,我建议你看看Pebble.
bit*_*nox 14
我是 wrapt_timeout_decorator 的作者
乍一看,这里介绍的大多数解决方案在 Linux 下都能很好地工作——因为我们有 fork() 和信号 ()——但在 Windows 上,情况看起来有点不同。当谈到 Linux 上的子线程时,你不能再使用信号了。
为了在 Windows 下生成一个进程,它需要是可腌制的——而许多装饰函数或类方法不是。
所以你需要使用更好的pickler,比如dill和multiprocess(不是pickle和multiprocessing)——这就是为什么你不能使用ProcessPoolExecutor(或只能使用有限的功能)。
对于超时本身 - 您需要定义超时的含义 - 因为在 Windows 上,生成进程需要相当长的(且无法确定的)时间。这在短超时时可能会很棘手。让我们假设,产生这个过程大约需要 0.5 秒(很容易!!!)。如果你给出 0.2 秒的超时时间会发生什么?函数是否应该在 0.5 + 0.2 秒后超时(让方法运行 0.2 秒)?或者被调用的进程应该在 0.2 秒后超时(在这种情况下,装饰函数将始终超时,因为在那个时候它甚至没有产生)?
嵌套装饰器也可能很糟糕,您不能在子线程中使用信号。如果您想创建一个真正通用的跨平台装饰器,则需要考虑(并测试)所有这些。
其他问题是将异常传递回调用者,以及记录问题(如果在装饰函数中使用 - 不支持记录到另一个进程中的文件)
我试图涵盖所有边缘情况,您可能会查看包 wrapt_timeout_decorator,或者至少测试您自己的解决方案,其灵感来自那里使用的单元测试。
@Alexis Eggermont - 不幸的是我没有足够的分数来评论 - 也许其他人可以通知你 - 我想我解决了你的导入问题。
boo*_*gie 12
通过@piro 构建和增强答案,您可以构建一个上下文管理器。这允许非常易读的代码,在成功运行后将禁用警报信号(设置 signal.alarm(0))
from contextlib import contextmanager
import signal
import time
@contextmanager
def timeout(duration):
    def timeout_handler(signum, frame):
        raise Exception(f'block timedout after {duration} seconds')
    signal.signal(signal.SIGALRM, timeout_handler)
    signal.alarm(duration)
    yield
    signal.alarm(0)
def sleeper(duration):
    time.sleep(duration)
    print('finished')
用法示例:
In [19]: with timeout(2):
    ...:     sleeper(1)
    ...:     
finished
In [20]: with timeout(2):
    ...:     sleeper(3)
    ...:         
---------------------------------------------------------------------------
Exception                                 Traceback (most recent call last)
<ipython-input-20-66c78858116f> in <module>()
      1 with timeout(2):
----> 2     sleeper(3)
      3 
<ipython-input-7-a75b966bf7ac> in sleeper(t)
      1 def sleeper(t):
----> 2     time.sleep(t)
      3     print('finished')
      4 
<ipython-input-18-533b9e684466> in timeout_handler(signum, frame)
      2 def timeout(duration):
      3     def timeout_handler(signum, frame):
----> 4         raise Exception(f'block timedout after {duration} seconds')
      5     signal.signal(signal.SIGALRM, timeout_handler)
      6     signal.alarm(duration)
Exception: block timedout after 2 seconds
出色,易于使用且可靠的PyPi项目超时装饰器(https://pypi.org/project/timeout-decorator/)
安装方式:
pip install timeout-decorator
用法:
import time
import timeout_decorator
@timeout_decorator.timeout(5)
def mytest():
    print "Start"
    for i in range(1,10):
        time.sleep(1)
        print "%d seconds have passed" % i
if __name__ == '__main__':
    mytest()
timeout-decorator不能在 windows 系统上工作,因为 windows 没有很好的支持signal。
如果你在 windows 系统中使用 timeout-decorator 你会得到以下结果
AttributeError: module 'signal' has no attribute 'SIGALRM'
有些人建议使用use_signals=False但对我不起作用。
作者@bitranox 创建了以下包:
pip install https://github.com/bitranox/wrapt-timeout-decorator/archive/master.zip
代码示例:
import time
from wrapt_timeout_decorator import *
@timeout(5)
def mytest(message):
    print(message)
    for i in range(1,10):
        time.sleep(1)
        print('{} seconds have passed'.format(i))
def main():
    mytest('starting')
if __name__ == '__main__':
    main()
给出以下异常:
TimeoutError: Function mytest timed out after 5 seconds
为了防止它对任何人有帮助,在@piro的答案的基础上,我制作了一个函数装饰器:
import time
import signal
from functools import wraps
def timeout(timeout_secs: int):
    def wrapper(func):
        @wraps(func)
        def time_limited(*args, **kwargs):
            # Register an handler for the timeout
            def handler(signum, frame):
                raise Exception(f"Timeout for function '{func.__name__}'")
            # Register the signal function handler
            signal.signal(signal.SIGALRM, handler)
            # Define a timeout for your function
            signal.alarm(timeout_secs)
            result = None
            try:
                result = func(*args, **kwargs)
            except Exception as exc:
                raise exc
            finally:
                # disable the signal alarm
                signal.alarm(0)
            return result
        return time_limited
    return wrapper
在具有超时的函数上使用包装器20 seconds将类似于:
    @timeout(20)
    def my_slow_or_never_ending_function(name):
        while True:
            time.sleep(1)
            print(f"Yet another second passed {name}...")
    try:
        results = my_slow_or_never_ending_function("Yooo!")
    except Exception as e:
        print(f"ERROR: {e}")
我们可以使用信号来实现同样的目的。我认为下面的例子对你有用。与线程相比,它非常简单。
import signal
def timeout(signum, frame):
    raise myException
#this is an infinite loop, never ending under normal circumstances
def main():
    print 'Starting Main ',
    while 1:
        print 'in main ',
#SIGALRM is only usable on a unix platform
signal.signal(signal.SIGALRM, timeout)
#change 5 to however many seconds you need
signal.alarm(5)
try:
    main()
except myException:
    print "whoops"
TimeoutError使用异常来提醒超时 - 可以轻松修改有关并行地图的完整解释和扩展,请参见此处https://flipdazed.github.io/blog/quant%20dev/parallel-functions-with-timeouts
>>> @killer_call(timeout=4)
... def bar(x):
...        import time
...        time.sleep(x)
...        return x
>>> bar(10)
Traceback (most recent call last):
  ...
__main__.TimeoutError: function 'bar' timed out after 4s
正如预期的那样
>>> bar(2)
2
import multiprocessing as mp
import multiprocessing.queues as mpq
import functools
import dill
from typing import Tuple, Callable, Dict, Optional, Iterable, List, Any
class TimeoutError(Exception):
    def __init__(self, func: Callable, timeout: int):
        self.t = timeout
        self.fname = func.__name__
    def __str__(self):
            return f"function '{self.fname}' timed out after {self.t}s"
def _lemmiwinks(func: Callable, args: Tuple, kwargs: Dict[str, Any], q: mp.Queue):
    """lemmiwinks crawls into the unknown"""
    q.put(dill.loads(func)(*args, **kwargs))
def killer_call(func: Callable = None, timeout: int = 10) -> Callable:
    """
    Single function call with a timeout
    Args:
        func: the function
        timeout: The timeout in seconds
    """
    if not isinstance(timeout, int):
        raise ValueError(f'timeout needs to be an int. Got: {timeout}')
    if func is None:
        return functools.partial(killer_call, timeout=timeout)
    @functools.wraps(killer_call)
    def _inners(*args, **kwargs) -> Any:
        q_worker = mp.Queue()
        proc = mp.Process(target=_lemmiwinks, args=(dill.dumps(func), args, kwargs, q_worker))
        proc.start()
        try:
            return q_worker.get(timeout=timeout)
        except mpq.Empty:
            raise TimeoutError(func, timeout)
        finally:
            try:
                proc.terminate()
            except:
                pass
    return _inners
if __name__ == '__main__':
    @killer_call(timeout=4)
    def bar(x):
        import time
        time.sleep(x)
        return x
    print(bar(2))
    bar(10)
由于工作方式,您需要在函数内部导入dill。
这也意味着doctest如果目标函数中有导入,这些函数可能不兼容。您会遇到__import__找不到的问题。
| 归档时间: | 
 | 
| 查看次数: | 266679 次 | 
| 最近记录: |