函数调用超时

Tei*_*ion 259 python multithreading timeout python-multithreading

我正在调用Python中的一个函数,我知道它可能会停止并迫使我重新启动脚本.

如何调用该函数或我将其包装成什么,以便如果它花费的时间超过5秒,脚本会取消它并执行其他操作?

pir*_*iro 204

如果在UNIX上运行,则可以使用信号包:

In [1]: import signal

# Register an handler for the timeout
In [2]: def handler(signum, frame):
   ...:     print "Forever is over!"
   ...:     raise Exception("end of time")
   ...: 

# This function *may* run for an indetermined time...
In [3]: def loop_forever():
   ...:     import time
   ...:     while 1:
   ...:         print "sec"
   ...:         time.sleep(1)
   ...:         
   ...:         

# Register the signal function handler
In [4]: signal.signal(signal.SIGALRM, handler)
Out[4]: 0

# Define a timeout for your function
In [5]: signal.alarm(10)
Out[5]: 0

In [6]: try:
   ...:     loop_forever()
   ...: except Exception, exc: 
   ...:     print exc
   ....: 
sec
sec
sec
sec
sec
sec
sec
sec
Forever is over!
end of time

# Cancel the timer if the function returned before timeout
# (ok, mine won't but yours maybe will :)
In [7]: signal.alarm(0)
Out[7]: 0
Run Code Online (Sandbox Code Playgroud)

调用10秒后,调用alarm.alarm(10)处理程序.这引发了一个例外,您可以从常规Python代码中截取.

这个模块不适合线程(但那么,谁呢?)

请注意,由于我们在超时发生时引发异常,因此它可能最终被捕获并在函数内被忽略,例如一个这样的函数:

def loop_forever():
    while 1:
        print 'sec'
        try:
            time.sleep(10)
        except:
            continue
Run Code Online (Sandbox Code Playgroud)

  • 关于线程的警告.signal.alarm仅适用于主线程.我尝试在Django视图中使用它 - 立即失败,仅关于主线程的措辞. (12认同)
  • @flypen那是因为`signal.alarm`和相关的`SIGALRM`在Windows平台上不可用. (10认同)
  • 我使用Python 2.5.4.有这样一个错误:Traceback(最近一次调用last):文件"aa.py",第85行,在func signal.signal(signal.SIGALRM,handler)中AttributeError:'module'对象没有属性'SIGALRM' (5认同)
  • 如果有很多进程,并且每次调用`signal.signal` ---它们都能正常工作吗?每个`signal.signal`调用都不会取消"并发"调用吗? (2认同)
  • 如果您需要这样做:将警报设置回 0 以取消“signal.alarm(0)”(请参阅​​ /sf/ask/1890918921/)。 (2认同)

ATO*_*TOA 136

您可以使用multiprocessing.Process这样做.

import multiprocessing
import time

# bar
def bar():
    for i in range(100):
        print "Tick"
        time.sleep(1)

if __name__ == '__main__':
    # Start bar as a process
    p = multiprocessing.Process(target=bar)
    p.start()

    # Wait for 10 seconds or until process finishes
    p.join(10)

    # If thread is still active
    if p.is_alive():
        print "running... let's kill it..."

        # Terminate
        p.terminate()
        p.join()
Run Code Online (Sandbox Code Playgroud)

  • 如何获取目标方法的返回值? (29认同)
  • 如果被调用的函数卡在I/O块上,这似乎不起作用. (4认同)
  • @bad_keypoints看到这个答案:http://stackoverflow.com/a/10415215/1384471基本上,你传递一个列表,你把答案放入. (3认同)
  • @ATOzTOA这个解决方案的问题,至少在我的目的,是它可能不允许儿童踏板自己清洁.从terminate函数`terminate()的文档...注意,不会执行退出处理程序和finally子句等.请注意,该过程的后代进程不会被终止 - 它们将简单地变成孤立的 (2认同)

Aar*_*all 70

如何调用该函数或将其包装在哪里,以便如果脚本取消时间超过5秒?

我发布了一个要点,用装饰器来解决这个问题/问题threading.Timer.这是故障.

兼容性的导入和设置

它使用Python 2和3进行了测试.它也应该在Unix/Linux和Windows下运行.

首先是进口.无论Python版本如何,这些都试图保持代码一致:

from __future__ import print_function
import sys
import threading
from time import sleep
try:
    import thread
except ImportError:
    import _thread as thread
Run Code Online (Sandbox Code Playgroud)

使用版本无关的代码:

try:
    range, _print = xrange, print
    def print(*args, **kwargs): 
        flush = kwargs.pop('flush', False)
        _print(*args, **kwargs)
        if flush:
            kwargs.get('file', sys.stdout).flush()            
except NameError:
    pass
Run Code Online (Sandbox Code Playgroud)

现在我们从标准库中导入了我们的功能.

exit_after 装饰

接下来我们需要一个函数来终止main()子线程:

def quit_function(fn_name):
    # print to stderr, unbuffered in Python 2.
    print('{0} took too long'.format(fn_name), file=sys.stderr)
    sys.stderr.flush() # Python 3 stderr is likely buffered.
    thread.interrupt_main() # raises KeyboardInterrupt
Run Code Online (Sandbox Code Playgroud)

这是装饰者本身:

def exit_after(s):
    '''
    use as decorator to exit process if 
    function takes longer than s seconds
    '''
    def outer(fn):
        def inner(*args, **kwargs):
            timer = threading.Timer(s, quit_function, args=[fn.__name__])
            timer.start()
            try:
                result = fn(*args, **kwargs)
            finally:
                timer.cancel()
            return result
        return inner
    return outer
Run Code Online (Sandbox Code Playgroud)

用法

这里的用法直接回答了你关于5秒后退出的问题!:

@exit_after(5)
def countdown(n):
    print('countdown started', flush=True)
    for i in range(n, -1, -1):
        print(i, end=', ', flush=True)
        sleep(1)
    print('countdown finished')
Run Code Online (Sandbox Code Playgroud)

演示:

>>> countdown(3)
countdown started
3, 2, 1, 0, countdown finished
>>> countdown(10)
countdown started
10, 9, 8, 7, 6, countdown took too long
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "<stdin>", line 11, in inner
  File "<stdin>", line 6, in countdown
KeyboardInterrupt
Run Code Online (Sandbox Code Playgroud)

第二个函数调用不会完成,而是进程应该以traceback退出!

KeyboardInterrupt 并不总是停止睡眠线程

请注意,Windows上的Python 2上的键盘中断并不总是会中断睡眠,例如:

@exit_after(1)
def sleep10():
    sleep(10)
    print('slept 10 seconds')

>>> sleep10()
sleep10 took too long         # Note that it hangs here about 9 more seconds
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "<stdin>", line 11, in inner
  File "<stdin>", line 3, in sleep10
KeyboardInterrupt
Run Code Online (Sandbox Code Playgroud)

除非明确检查PyErr_CheckSignals(),否则它是否可能会中断在扩展中运行的代码,请参阅 Cython,Python和KeyboardInterrupt忽略

在任何情况下,我都会避免睡一个线程超过一秒 - 这是处理器时间的一个因素.

如何调用该函数或我将其包装成什么,以便如果它花费的时间超过5秒,脚本会取消它并执行其他操作?

要捕获它并执行其他操作,您可以捕获KeyboardInterrupt.

>>> try:
...     countdown(10)
... except KeyboardInterrupt:
...     print('do something else')
... 
countdown started
10, 9, 8, 7, 6, countdown took too long
do something else
Run Code Online (Sandbox Code Playgroud)

  • 为什么我必须调用`thread.interrupt_main()`,为什么我不能直接引发异常? (3认同)

Ale*_*lex 48

我有一个不同的提议,它是一个纯函数(使用与线程建议相同的API)并且似乎工作正常(基于此线程的建议)

def timeout(func, args=(), kwargs={}, timeout_duration=1, default=None):
    import signal

    class TimeoutError(Exception):
        pass

    def handler(signum, frame):
        raise TimeoutError()

    # set the timeout handler
    signal.signal(signal.SIGALRM, handler) 
    signal.alarm(timeout_duration)
    try:
        result = func(*args, **kwargs)
    except TimeoutError as exc:
        result = default
    finally:
        signal.alarm(0)

    return result
Run Code Online (Sandbox Code Playgroud)

  • Max,不是真的 - 适用于任何符合POSIX标准的unix.我认为您的评论应该更准确,不适用于Windows. (15认同)
  • 这不是最好的解决方案,因为它只适用于Linux. (11认同)
  • 还有一点需要注意:Unix信号方法只有在主线程中应用它时才有效.在子线程中应用它会引发异常并且不起作用. (9认同)
  • 你应该避免将kwargs设置为空的dict.一个常见的Python问题是函数的默认参数是可变的.因此,字典将在所有"超时"调用中共享.最好将默认值设置为"None",并在函数的第一行添加`kwargs = kwargs或{}`.Args没问题,因为元组不可变. (6认同)
  • 您还应该还原原始信号处理程序.见http://stackoverflow.com/questions/492519/timeout-on-a-python-function-call/494273#comment8635219_494273 (3认同)

Ric*_*ich 29

在单元测试中搜索超时调用时,我遇到了这个线程.我没有在答案或第三方包中找到任何简单的内容,所以我在下面编写了装饰器,你可以直接进入代码:

import multiprocessing.pool
import functools

def timeout(max_timeout):
    """Timeout decorator, parameter in seconds."""
    def timeout_decorator(item):
        """Wrap the original function."""
        @functools.wraps(item)
        def func_wrapper(*args, **kwargs):
            """Closure for function."""
            pool = multiprocessing.pool.ThreadPool(processes=1)
            async_result = pool.apply_async(item, args, kwargs)
            # raises a TimeoutError if execution exceeds max_timeout
            return async_result.get(max_timeout)
        return func_wrapper
    return timeout_decorator
Run Code Online (Sandbox Code Playgroud)

然后就这么简单来超时测试或任何你喜欢的功能:

@timeout(5.0)  # if execution takes longer than 5 seconds, raise a TimeoutError
def test_base_regression(self):
    ...
Run Code Online (Sandbox Code Playgroud)

  • 请注意,因为在达到超时后不会终止该功能! (12认同)
  • 是的,这需要一些调整。它让线程永远持续下去。 (2认同)
  • IDK如果这是最好的方法,但你可以在func_wrapper中尝试/捕获`Exception`并在catch之后执行`pool.close()`以确保线程无论如何都会在之后死亡.然后你可以抛出`TimeoutError`或者你想要的任何东西.似乎为我工作. (2认同)
  • 这很有用,但是一旦我做了很多次,我得到`RuntimeError:无法启动新线程`.如果我忽略它还是有其他东西可以解决这个问题吗?提前致谢! (2认同)

Bri*_*ian 16

有很多建议,但没有使用concurrent.futures,我认为这是最清晰的处理方式.

from concurrent.futures import ProcessPoolExecutor

# Warning: this does not terminate function if timeout
def timeout_five(fnc, *args, **kwargs):
    with ProcessPoolExecutor() as p:
        f = p.submit(fnc, *args, **kwargs)
        return f.result(timeout=5)
Run Code Online (Sandbox Code Playgroud)

超级简单的阅读和维护.

我们创建一个池,提交一个进程,然后等待最多5秒钟,然后提出一个TimeoutError,你可以捕获并处理你需要的东西.

原生于python 3.2+并向后移植到2.7(pip install期货).

线程和进程之间的切换是在更换为简单ProcessPoolExecutorThreadPoolExecutor.

如果你想在超时时终止进程,我建议你看看Pebble.

  • @ScottStafford进程/线程不会因为引发了TimeoutError而结束.因此,进程或线程仍将尝试运行完成,并且不会在超时时自动返回控制权. (5认同)
  • “警告:如果超时,这不会终止功能”是什么意思? (2认同)

ege*_*and 15

stopit在pypi上找到的软件包似乎很好地处理了超时.

我喜欢@stopit.threading_timeoutable装饰器,它timeout为装饰函数添加一个参数,它可以满足你的期望,它会停止这个功能.

在pypi上查看:https://pypi.python.org/pypi/stopit

  • 对于那些可能像我一样感到困惑的人:“stopit.utils.TimeoutException”不会停止您的代码!此后代码继续正常!我在一个运行正常的程序上花了 30 分钟.. 非常好的答案! (2认同)

bit*_*nox 14

我是 wrapt_timeout_decorator 的作者

乍一看,这里介绍的大多数解决方案在 Linux 下都能很好地工作——因为我们有 fork() 和信号 ()——但在 Windows 上,情况看起来有点不同。当谈到 Linux 上的子线程时,你不能再使用信号了。

为了在 Windows 下生成一个进程,它需要是可腌制的——而许多装饰函数或类方法不是。

所以你需要使用更好的pickler,比如dill和multiprocess(不是pickle和multiprocessing)——这就是为什么你不能使用ProcessPoolExecutor(或只能使用有限的功能)。

对于超时本身 - 您需要定义超时的含义 - 因为在 Windows 上,生成进程需要相当长的(且无法确定的)时间。这在短超时时可能会很棘手。让我们假设,产生这个过程大约需要 0.5 秒(很容易!!!)。如果你给出 0.2 秒的超时时间会发生什么?函数是否应该在 0.5 + 0.2 秒后超时(让方法运行 0.2 秒)?或者被调用的进程应该在 0.2 秒后超时(在这种情况下,装饰函数将始终超时,因为在那个时候它甚至没有产生)?

嵌套装饰器也可能很糟糕,您不能在子线程中使用信号。如果您想创建一个真正通用的跨平台装饰器,则需要考虑(并测试)所有这些。

其他问题是将异常传递回调用者,以及记录问题(如果在装饰函数中使用 - 不支持记录到另一个进程中的文件)

我试图涵盖所有边缘情况,您可能会查看包 wrapt_timeout_decorator,或者至少测试您自己的解决方案,其灵感来自那里使用的单元测试。

@Alexis Eggermont - 不幸的是我没有足够的分数来评论 - 也许其他人可以通知你 - 我想我解决了你的导入问题。

  • @Arjun Sankarlal:当然,如果工人被杀,管道就会破裂。您需要捕获调度程序任务上的损坏管道错误并正确清理。 (2认同)

boo*_*gie 12

通过@piro 构建和增强答案,您可以构建一个上下文管理器。这允许非常易读的代码,在成功运行后将禁用警报信号(设置 signal.alarm(0))

from contextlib import contextmanager
import signal
import time

@contextmanager
def timeout(duration):
    def timeout_handler(signum, frame):
        raise Exception(f'block timedout after {duration} seconds')
    signal.signal(signal.SIGALRM, timeout_handler)
    signal.alarm(duration)
    yield
    signal.alarm(0)

def sleeper(duration):
    time.sleep(duration)
    print('finished')
Run Code Online (Sandbox Code Playgroud)

用法示例:

In [19]: with timeout(2):
    ...:     sleeper(1)
    ...:     
finished

In [20]: with timeout(2):
    ...:     sleeper(3)
    ...:         
---------------------------------------------------------------------------
Exception                                 Traceback (most recent call last)
<ipython-input-20-66c78858116f> in <module>()
      1 with timeout(2):
----> 2     sleeper(3)
      3 

<ipython-input-7-a75b966bf7ac> in sleeper(t)
      1 def sleeper(t):
----> 2     time.sleep(t)
      3     print('finished')
      4 

<ipython-input-18-533b9e684466> in timeout_handler(signum, frame)
      2 def timeout(duration):
      3     def timeout_handler(signum, frame):
----> 4         raise Exception(f'block timedout after {duration} seconds')
      5     signal.signal(signal.SIGALRM, timeout_handler)
      6     signal.alarm(duration)

Exception: block timedout after 2 seconds
Run Code Online (Sandbox Code Playgroud)

  • 该上下文管理器当前实现的一个问题是上下文内代码块内的异常可能导致信号警报未被禁用。要修复它,应该添加“try”+“finally”。类似于下面的超时函数装饰器(/sf/answers/4656117301/) (3认同)

Gil*_*Gil 8

出色,易于使用且可靠的PyPi项目超时装饰器https://pypi.org/project/timeout-decorator/

安装方式

pip install timeout-decorator
Run Code Online (Sandbox Code Playgroud)

用法

import time
import timeout_decorator

@timeout_decorator.timeout(5)
def mytest():
    print "Start"
    for i in range(1,10):
        time.sleep(1)
        print "%d seconds have passed" % i

if __name__ == '__main__':
    mytest()
Run Code Online (Sandbox Code Playgroud)

  • 我很欣赏清晰的解决方案。但是谁能解释这个库是如何工作的,尤其是在处理多线程时。我个人害怕使用未知的机制来处理线程或信号。 (4认同)

as *_* if 7

timeout-decorator不能在 windows 系统上工作,因为 windows 没有很好的支持signal

如果你在 windows 系统中使用 timeout-decorator 你会得到以下结果

AttributeError: module 'signal' has no attribute 'SIGALRM'
Run Code Online (Sandbox Code Playgroud)

有些人建议使用use_signals=False但对我不起作用。

作者@bitranox 创建了以下包:

pip install https://github.com/bitranox/wrapt-timeout-decorator/archive/master.zip
Run Code Online (Sandbox Code Playgroud)

代码示例:

import time
from wrapt_timeout_decorator import *

@timeout(5)
def mytest(message):
    print(message)
    for i in range(1,10):
        time.sleep(1)
        print('{} seconds have passed'.format(i))

def main():
    mytest('starting')


if __name__ == '__main__':
    main()
Run Code Online (Sandbox Code Playgroud)

给出以下异常:

TimeoutError: Function mytest timed out after 5 seconds
Run Code Online (Sandbox Code Playgroud)


jos*_*.rf 7

为了防止它对任何人有帮助,在@piro的答案的基础上,我制作了一个函数装饰器:

import time
import signal
from functools import wraps


def timeout(timeout_secs: int):
    def wrapper(func):
        @wraps(func)
        def time_limited(*args, **kwargs):
            # Register an handler for the timeout
            def handler(signum, frame):
                raise Exception(f"Timeout for function '{func.__name__}'")

            # Register the signal function handler
            signal.signal(signal.SIGALRM, handler)

            # Define a timeout for your function
            signal.alarm(timeout_secs)

            result = None
            try:
                result = func(*args, **kwargs)
            except Exception as exc:
                raise exc
            finally:
                # disable the signal alarm
                signal.alarm(0)

            return result

        return time_limited

    return wrapper
Run Code Online (Sandbox Code Playgroud)

在具有超时的函数上使用包装器20 seconds将类似于:

    @timeout(20)
    def my_slow_or_never_ending_function(name):
        while True:
            time.sleep(1)
            print(f"Yet another second passed {name}...")

    try:
        results = my_slow_or_never_ending_function("Yooo!")
    except Exception as e:
        print(f"ERROR: {e}")
Run Code Online (Sandbox Code Playgroud)


A R*_*A R 5

我们可以使用信号来实现同样的目的。我认为下面的例子对你有用。与线程相比,它非常简单。

import signal

def timeout(signum, frame):
    raise myException

#this is an infinite loop, never ending under normal circumstances
def main():
    print 'Starting Main ',
    while 1:
        print 'in main ',

#SIGALRM is only usable on a unix platform
signal.signal(signal.SIGALRM, timeout)

#change 5 to however many seconds you need
signal.alarm(5)

try:
    main()
except myException:
    print "whoops"
Run Code Online (Sandbox Code Playgroud)

  • 最好选择一个特定的异常并仅捕获它。仅仅“尝试:...除了:...”总是一个坏主意。 (2认同)

Ale*_*ane 5

强调

  • RaisesTimeoutError使用异常来提醒超时 - 可以轻松修改
  • 跨平台:Windows & Mac OS X
  • 兼容性:Python 3.6+(我也在 python 2.7 上进行了测试,它可以通过小的语法调整工作)

有关并行地图的完整解释和扩展,请参见此处https://flipdazed.github.io/blog/quant%20dev/parallel-functions-with-timeouts

最小示例

>>> @killer_call(timeout=4)
... def bar(x):
...        import time
...        time.sleep(x)
...        return x
>>> bar(10)
Traceback (most recent call last):
  ...
__main__.TimeoutError: function 'bar' timed out after 4s
Run Code Online (Sandbox Code Playgroud)

正如预期的那样

>>> bar(2)
2
Run Code Online (Sandbox Code Playgroud)

完整代码

import multiprocessing as mp
import multiprocessing.queues as mpq
import functools
import dill

from typing import Tuple, Callable, Dict, Optional, Iterable, List, Any

class TimeoutError(Exception):

    def __init__(self, func: Callable, timeout: int):
        self.t = timeout
        self.fname = func.__name__

    def __str__(self):
            return f"function '{self.fname}' timed out after {self.t}s"


def _lemmiwinks(func: Callable, args: Tuple, kwargs: Dict[str, Any], q: mp.Queue):
    """lemmiwinks crawls into the unknown"""
    q.put(dill.loads(func)(*args, **kwargs))


def killer_call(func: Callable = None, timeout: int = 10) -> Callable:
    """
    Single function call with a timeout

    Args:
        func: the function
        timeout: The timeout in seconds
    """

    if not isinstance(timeout, int):
        raise ValueError(f'timeout needs to be an int. Got: {timeout}')

    if func is None:
        return functools.partial(killer_call, timeout=timeout)

    @functools.wraps(killer_call)
    def _inners(*args, **kwargs) -> Any:
        q_worker = mp.Queue()
        proc = mp.Process(target=_lemmiwinks, args=(dill.dumps(func), args, kwargs, q_worker))
        proc.start()
        try:
            return q_worker.get(timeout=timeout)
        except mpq.Empty:
            raise TimeoutError(func, timeout)
        finally:
            try:
                proc.terminate()
            except:
                pass
    return _inners

if __name__ == '__main__':
    @killer_call(timeout=4)
    def bar(x):
        import time
        time.sleep(x)
        return x

    print(bar(2))
    bar(10)
Run Code Online (Sandbox Code Playgroud)

笔记

由于工作方式,您需要在函数内部导入dill

这也意味着doctest如果目标函数中有导入,这些函数可能不兼容。您会遇到__import__找不到的问题。