Tei*_*ion 259 python multithreading timeout python-multithreading
我正在调用Python中的一个函数,我知道它可能会停止并迫使我重新启动脚本.
如何调用该函数或我将其包装成什么,以便如果它花费的时间超过5秒,脚本会取消它并执行其他操作?
pir*_*iro 204
如果在UNIX上运行,则可以使用信号包:
In [1]: import signal
# Register an handler for the timeout
In [2]: def handler(signum, frame):
...: print "Forever is over!"
...: raise Exception("end of time")
...:
# This function *may* run for an indetermined time...
In [3]: def loop_forever():
...: import time
...: while 1:
...: print "sec"
...: time.sleep(1)
...:
...:
# Register the signal function handler
In [4]: signal.signal(signal.SIGALRM, handler)
Out[4]: 0
# Define a timeout for your function
In [5]: signal.alarm(10)
Out[5]: 0
In [6]: try:
...: loop_forever()
...: except Exception, exc:
...: print exc
....:
sec
sec
sec
sec
sec
sec
sec
sec
Forever is over!
end of time
# Cancel the timer if the function returned before timeout
# (ok, mine won't but yours maybe will :)
In [7]: signal.alarm(0)
Out[7]: 0
Run Code Online (Sandbox Code Playgroud)
调用10秒后,调用alarm.alarm(10)
处理程序.这引发了一个例外,您可以从常规Python代码中截取.
这个模块不适合线程(但那么,谁呢?)
请注意,由于我们在超时发生时引发异常,因此它可能最终被捕获并在函数内被忽略,例如一个这样的函数:
def loop_forever():
while 1:
print 'sec'
try:
time.sleep(10)
except:
continue
Run Code Online (Sandbox Code Playgroud)
ATO*_*TOA 136
您可以使用multiprocessing.Process
这样做.
码
import multiprocessing
import time
# bar
def bar():
for i in range(100):
print "Tick"
time.sleep(1)
if __name__ == '__main__':
# Start bar as a process
p = multiprocessing.Process(target=bar)
p.start()
# Wait for 10 seconds or until process finishes
p.join(10)
# If thread is still active
if p.is_alive():
print "running... let's kill it..."
# Terminate
p.terminate()
p.join()
Run Code Online (Sandbox Code Playgroud)
Aar*_*all 70
如何调用该函数或将其包装在哪里,以便如果脚本取消时间超过5秒?
我发布了一个要点,用装饰器来解决这个问题/问题threading.Timer
.这是故障.
它使用Python 2和3进行了测试.它也应该在Unix/Linux和Windows下运行.
首先是进口.无论Python版本如何,这些都试图保持代码一致:
from __future__ import print_function
import sys
import threading
from time import sleep
try:
import thread
except ImportError:
import _thread as thread
Run Code Online (Sandbox Code Playgroud)
使用版本无关的代码:
try:
range, _print = xrange, print
def print(*args, **kwargs):
flush = kwargs.pop('flush', False)
_print(*args, **kwargs)
if flush:
kwargs.get('file', sys.stdout).flush()
except NameError:
pass
Run Code Online (Sandbox Code Playgroud)
现在我们从标准库中导入了我们的功能.
exit_after
装饰接下来我们需要一个函数来终止main()
子线程:
def quit_function(fn_name):
# print to stderr, unbuffered in Python 2.
print('{0} took too long'.format(fn_name), file=sys.stderr)
sys.stderr.flush() # Python 3 stderr is likely buffered.
thread.interrupt_main() # raises KeyboardInterrupt
Run Code Online (Sandbox Code Playgroud)
这是装饰者本身:
def exit_after(s):
'''
use as decorator to exit process if
function takes longer than s seconds
'''
def outer(fn):
def inner(*args, **kwargs):
timer = threading.Timer(s, quit_function, args=[fn.__name__])
timer.start()
try:
result = fn(*args, **kwargs)
finally:
timer.cancel()
return result
return inner
return outer
Run Code Online (Sandbox Code Playgroud)
这里的用法直接回答了你关于5秒后退出的问题!:
@exit_after(5)
def countdown(n):
print('countdown started', flush=True)
for i in range(n, -1, -1):
print(i, end=', ', flush=True)
sleep(1)
print('countdown finished')
Run Code Online (Sandbox Code Playgroud)
演示:
>>> countdown(3)
countdown started
3, 2, 1, 0, countdown finished
>>> countdown(10)
countdown started
10, 9, 8, 7, 6, countdown took too long
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "<stdin>", line 11, in inner
File "<stdin>", line 6, in countdown
KeyboardInterrupt
Run Code Online (Sandbox Code Playgroud)
第二个函数调用不会完成,而是进程应该以traceback退出!
KeyboardInterrupt
并不总是停止睡眠线程请注意,Windows上的Python 2上的键盘中断并不总是会中断睡眠,例如:
@exit_after(1)
def sleep10():
sleep(10)
print('slept 10 seconds')
>>> sleep10()
sleep10 took too long # Note that it hangs here about 9 more seconds
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "<stdin>", line 11, in inner
File "<stdin>", line 3, in sleep10
KeyboardInterrupt
Run Code Online (Sandbox Code Playgroud)
除非明确检查PyErr_CheckSignals()
,否则它是否可能会中断在扩展中运行的代码,请参阅 Cython,Python和KeyboardInterrupt忽略
在任何情况下,我都会避免睡一个线程超过一秒 - 这是处理器时间的一个因素.
如何调用该函数或我将其包装成什么,以便如果它花费的时间超过5秒,脚本会取消它并执行其他操作?
要捕获它并执行其他操作,您可以捕获KeyboardInterrupt.
>>> try:
... countdown(10)
... except KeyboardInterrupt:
... print('do something else')
...
countdown started
10, 9, 8, 7, 6, countdown took too long
do something else
Run Code Online (Sandbox Code Playgroud)
Ale*_*lex 48
我有一个不同的提议,它是一个纯函数(使用与线程建议相同的API)并且似乎工作正常(基于此线程的建议)
def timeout(func, args=(), kwargs={}, timeout_duration=1, default=None):
import signal
class TimeoutError(Exception):
pass
def handler(signum, frame):
raise TimeoutError()
# set the timeout handler
signal.signal(signal.SIGALRM, handler)
signal.alarm(timeout_duration)
try:
result = func(*args, **kwargs)
except TimeoutError as exc:
result = default
finally:
signal.alarm(0)
return result
Run Code Online (Sandbox Code Playgroud)
Ric*_*ich 29
在单元测试中搜索超时调用时,我遇到了这个线程.我没有在答案或第三方包中找到任何简单的内容,所以我在下面编写了装饰器,你可以直接进入代码:
import multiprocessing.pool
import functools
def timeout(max_timeout):
"""Timeout decorator, parameter in seconds."""
def timeout_decorator(item):
"""Wrap the original function."""
@functools.wraps(item)
def func_wrapper(*args, **kwargs):
"""Closure for function."""
pool = multiprocessing.pool.ThreadPool(processes=1)
async_result = pool.apply_async(item, args, kwargs)
# raises a TimeoutError if execution exceeds max_timeout
return async_result.get(max_timeout)
return func_wrapper
return timeout_decorator
Run Code Online (Sandbox Code Playgroud)
然后就这么简单来超时测试或任何你喜欢的功能:
@timeout(5.0) # if execution takes longer than 5 seconds, raise a TimeoutError
def test_base_regression(self):
...
Run Code Online (Sandbox Code Playgroud)
Bri*_*ian 16
有很多建议,但没有使用concurrent.futures,我认为这是最清晰的处理方式.
from concurrent.futures import ProcessPoolExecutor
# Warning: this does not terminate function if timeout
def timeout_five(fnc, *args, **kwargs):
with ProcessPoolExecutor() as p:
f = p.submit(fnc, *args, **kwargs)
return f.result(timeout=5)
Run Code Online (Sandbox Code Playgroud)
超级简单的阅读和维护.
我们创建一个池,提交一个进程,然后等待最多5秒钟,然后提出一个TimeoutError,你可以捕获并处理你需要的东西.
原生于python 3.2+并向后移植到2.7(pip install期货).
线程和进程之间的切换是在更换为简单ProcessPoolExecutor
用ThreadPoolExecutor
.
如果你想在超时时终止进程,我建议你看看Pebble.
bit*_*nox 14
我是 wrapt_timeout_decorator 的作者
乍一看,这里介绍的大多数解决方案在 Linux 下都能很好地工作——因为我们有 fork() 和信号 ()——但在 Windows 上,情况看起来有点不同。当谈到 Linux 上的子线程时,你不能再使用信号了。
为了在 Windows 下生成一个进程,它需要是可腌制的——而许多装饰函数或类方法不是。
所以你需要使用更好的pickler,比如dill和multiprocess(不是pickle和multiprocessing)——这就是为什么你不能使用ProcessPoolExecutor(或只能使用有限的功能)。
对于超时本身 - 您需要定义超时的含义 - 因为在 Windows 上,生成进程需要相当长的(且无法确定的)时间。这在短超时时可能会很棘手。让我们假设,产生这个过程大约需要 0.5 秒(很容易!!!)。如果你给出 0.2 秒的超时时间会发生什么?函数是否应该在 0.5 + 0.2 秒后超时(让方法运行 0.2 秒)?或者被调用的进程应该在 0.2 秒后超时(在这种情况下,装饰函数将始终超时,因为在那个时候它甚至没有产生)?
嵌套装饰器也可能很糟糕,您不能在子线程中使用信号。如果您想创建一个真正通用的跨平台装饰器,则需要考虑(并测试)所有这些。
其他问题是将异常传递回调用者,以及记录问题(如果在装饰函数中使用 - 不支持记录到另一个进程中的文件)
我试图涵盖所有边缘情况,您可能会查看包 wrapt_timeout_decorator,或者至少测试您自己的解决方案,其灵感来自那里使用的单元测试。
@Alexis Eggermont - 不幸的是我没有足够的分数来评论 - 也许其他人可以通知你 - 我想我解决了你的导入问题。
boo*_*gie 12
通过@piro 构建和增强答案,您可以构建一个上下文管理器。这允许非常易读的代码,在成功运行后将禁用警报信号(设置 signal.alarm(0))
from contextlib import contextmanager
import signal
import time
@contextmanager
def timeout(duration):
def timeout_handler(signum, frame):
raise Exception(f'block timedout after {duration} seconds')
signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(duration)
yield
signal.alarm(0)
def sleeper(duration):
time.sleep(duration)
print('finished')
Run Code Online (Sandbox Code Playgroud)
用法示例:
In [19]: with timeout(2):
...: sleeper(1)
...:
finished
In [20]: with timeout(2):
...: sleeper(3)
...:
---------------------------------------------------------------------------
Exception Traceback (most recent call last)
<ipython-input-20-66c78858116f> in <module>()
1 with timeout(2):
----> 2 sleeper(3)
3
<ipython-input-7-a75b966bf7ac> in sleeper(t)
1 def sleeper(t):
----> 2 time.sleep(t)
3 print('finished')
4
<ipython-input-18-533b9e684466> in timeout_handler(signum, frame)
2 def timeout(duration):
3 def timeout_handler(signum, frame):
----> 4 raise Exception(f'block timedout after {duration} seconds')
5 signal.signal(signal.SIGALRM, timeout_handler)
6 signal.alarm(duration)
Exception: block timedout after 2 seconds
Run Code Online (Sandbox Code Playgroud)
出色,易于使用且可靠的PyPi项目超时装饰器(https://pypi.org/project/timeout-decorator/)
安装方式:
pip install timeout-decorator
Run Code Online (Sandbox Code Playgroud)
用法:
import time
import timeout_decorator
@timeout_decorator.timeout(5)
def mytest():
print "Start"
for i in range(1,10):
time.sleep(1)
print "%d seconds have passed" % i
if __name__ == '__main__':
mytest()
Run Code Online (Sandbox Code Playgroud)
timeout-decorator
不能在 windows 系统上工作,因为 windows 没有很好的支持signal
。
如果你在 windows 系统中使用 timeout-decorator 你会得到以下结果
AttributeError: module 'signal' has no attribute 'SIGALRM'
Run Code Online (Sandbox Code Playgroud)
有些人建议使用use_signals=False
但对我不起作用。
作者@bitranox 创建了以下包:
pip install https://github.com/bitranox/wrapt-timeout-decorator/archive/master.zip
Run Code Online (Sandbox Code Playgroud)
代码示例:
import time
from wrapt_timeout_decorator import *
@timeout(5)
def mytest(message):
print(message)
for i in range(1,10):
time.sleep(1)
print('{} seconds have passed'.format(i))
def main():
mytest('starting')
if __name__ == '__main__':
main()
Run Code Online (Sandbox Code Playgroud)
给出以下异常:
TimeoutError: Function mytest timed out after 5 seconds
Run Code Online (Sandbox Code Playgroud)
为了防止它对任何人有帮助,在@piro的答案的基础上,我制作了一个函数装饰器:
import time
import signal
from functools import wraps
def timeout(timeout_secs: int):
def wrapper(func):
@wraps(func)
def time_limited(*args, **kwargs):
# Register an handler for the timeout
def handler(signum, frame):
raise Exception(f"Timeout for function '{func.__name__}'")
# Register the signal function handler
signal.signal(signal.SIGALRM, handler)
# Define a timeout for your function
signal.alarm(timeout_secs)
result = None
try:
result = func(*args, **kwargs)
except Exception as exc:
raise exc
finally:
# disable the signal alarm
signal.alarm(0)
return result
return time_limited
return wrapper
Run Code Online (Sandbox Code Playgroud)
在具有超时的函数上使用包装器20 seconds
将类似于:
@timeout(20)
def my_slow_or_never_ending_function(name):
while True:
time.sleep(1)
print(f"Yet another second passed {name}...")
try:
results = my_slow_or_never_ending_function("Yooo!")
except Exception as e:
print(f"ERROR: {e}")
Run Code Online (Sandbox Code Playgroud)
我们可以使用信号来实现同样的目的。我认为下面的例子对你有用。与线程相比,它非常简单。
import signal
def timeout(signum, frame):
raise myException
#this is an infinite loop, never ending under normal circumstances
def main():
print 'Starting Main ',
while 1:
print 'in main ',
#SIGALRM is only usable on a unix platform
signal.signal(signal.SIGALRM, timeout)
#change 5 to however many seconds you need
signal.alarm(5)
try:
main()
except myException:
print "whoops"
Run Code Online (Sandbox Code Playgroud)
TimeoutError
使用异常来提醒超时 - 可以轻松修改有关并行地图的完整解释和扩展,请参见此处https://flipdazed.github.io/blog/quant%20dev/parallel-functions-with-timeouts
>>> @killer_call(timeout=4)
... def bar(x):
... import time
... time.sleep(x)
... return x
>>> bar(10)
Traceback (most recent call last):
...
__main__.TimeoutError: function 'bar' timed out after 4s
Run Code Online (Sandbox Code Playgroud)
正如预期的那样
>>> bar(2)
2
Run Code Online (Sandbox Code Playgroud)
import multiprocessing as mp
import multiprocessing.queues as mpq
import functools
import dill
from typing import Tuple, Callable, Dict, Optional, Iterable, List, Any
class TimeoutError(Exception):
def __init__(self, func: Callable, timeout: int):
self.t = timeout
self.fname = func.__name__
def __str__(self):
return f"function '{self.fname}' timed out after {self.t}s"
def _lemmiwinks(func: Callable, args: Tuple, kwargs: Dict[str, Any], q: mp.Queue):
"""lemmiwinks crawls into the unknown"""
q.put(dill.loads(func)(*args, **kwargs))
def killer_call(func: Callable = None, timeout: int = 10) -> Callable:
"""
Single function call with a timeout
Args:
func: the function
timeout: The timeout in seconds
"""
if not isinstance(timeout, int):
raise ValueError(f'timeout needs to be an int. Got: {timeout}')
if func is None:
return functools.partial(killer_call, timeout=timeout)
@functools.wraps(killer_call)
def _inners(*args, **kwargs) -> Any:
q_worker = mp.Queue()
proc = mp.Process(target=_lemmiwinks, args=(dill.dumps(func), args, kwargs, q_worker))
proc.start()
try:
return q_worker.get(timeout=timeout)
except mpq.Empty:
raise TimeoutError(func, timeout)
finally:
try:
proc.terminate()
except:
pass
return _inners
if __name__ == '__main__':
@killer_call(timeout=4)
def bar(x):
import time
time.sleep(x)
return x
print(bar(2))
bar(10)
Run Code Online (Sandbox Code Playgroud)
由于工作方式,您需要在函数内部导入dill
。
这也意味着doctest
如果目标函数中有导入,这些函数可能不兼容。您会遇到__import__
找不到的问题。
归档时间: |
|
查看次数: |
266679 次 |
最近记录: |