2Cu*_*bed 52 python tornado python-3.x python-asyncio python-3.5
我正在迁移tornado到asyncio,而且我找不到asyncio相同tornado的东西PeriodicCallback.(A PeriodicCallback有两个参数:运行的函数和调用之间的毫秒数.)
asyncio?RecursionError一会儿的风险?A. *_*vis 38
对于低于3.5的Python版本:
import asyncio
@asyncio.coroutine
def periodic():
while True:
print('periodic')
yield from asyncio.sleep(1)
def stop():
task.cancel()
loop = asyncio.get_event_loop()
loop.call_later(5, stop)
task = loop.create_task(periodic())
try:
loop.run_until_complete(task)
except asyncio.CancelledError:
pass
Run Code Online (Sandbox Code Playgroud)
对于Python 3.5及更高版本:
import asyncio
async def periodic():
while True:
print('periodic')
await asyncio.sleep(1)
def stop():
task.cancel()
loop = asyncio.get_event_loop()
loop.call_later(5, stop)
task = loop.create_task(periodic())
try:
loop.run_until_complete(task)
except asyncio.CancelledError:
pass
Run Code Online (Sandbox Code Playgroud)
Mik*_*mov 20
当你觉得某些东西应该在你的asyncio程序的"后台"中发生时,asyncio.Task可能是一个很好的方法.您可以阅读此文章以了解如何使用任务.
这里可能实现定期执行某些函数的类:
import asyncio
from contextlib import suppress
class Periodic:
def __init__(self, func, time):
self.func = func
self.time = time
self.is_started = False
self._task = None
async def start(self):
if not self.is_started:
self.is_started = True
# Start task to call func periodically:
self._task = asyncio.ensure_future(self._run())
async def stop(self):
if self.is_started:
self.is_started = False
# Stop task and await it stopped:
self._task.cancel()
with suppress(asyncio.CancelledError):
await self._task
async def _run(self):
while True:
await asyncio.sleep(self.time)
self.func()
Run Code Online (Sandbox Code Playgroud)
我们来测试一下:
async def main():
p = Periodic(lambda: print('test'), 1)
try:
print('Start')
await p.start()
await asyncio.sleep(3.1)
print('Stop')
await p.stop()
await asyncio.sleep(3.1)
print('Start')
await p.start()
await asyncio.sleep(3.1)
finally:
await p.stop() # we should stop task finally
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Run Code Online (Sandbox Code Playgroud)
输出:
Start
test
test
test
Stop
Start
test
test
test
[Finished in 9.5s]
Run Code Online (Sandbox Code Playgroud)
正如你所看到的,start我们只是启动调用某些函数的任务,并在无限循环中休眠一段时间.在stop我们刚刚取消该任务.注意,该任务应该在程序完成时停止.
另一个重要的事情是你的回调不应该花费太多时间来执行(或者它会冻结你的事件循环).如果您计划调用一些长时间运行func,您可能需要在执行程序中运行它.
Mar*_*ers 17
对于定期呼叫没有内置支持,没有.
只需创建自己的调度程序循环即可休眠并执行计划的任何任务:
import math, time
async def scheduler():
while True:
# sleep until the next whole second
now = time.time()
await asyncio.sleep(math.ceil(now) - now)
# execute any scheduled tasks
await for task in scheduled_tasks(time.time()):
await task()
Run Code Online (Sandbox Code Playgroud)
该scheduled_tasks()迭代器应该产生对于准备在给定的时间运行的任务.请注意,制定计划并启动所有任务理论上可能需要超过1秒; 这里的想法是调度程序产生自上次检查以来应该已经开始的所有任务.
小智 14
一个可能有用的变体:如果您希望重复调用每 n 秒发生一次,而不是在上次执行结束和下一次开始之间的 n 秒,并且您不希望调用在时间上重叠,则以下更简单:
async def repeat(interval, func, *args, **kwargs):
"""Run func every interval seconds.
If func has not finished before *interval*, will run again
immediately when the previous iteration finished.
*args and **kwargs are passed as the arguments to func.
"""
while True:
await asyncio.gather(
func(*args, **kwargs),
asyncio.sleep(interval),
)
Run Code Online (Sandbox Code Playgroud)
以及使用它在后台运行几个任务的示例:
async def f():
await asyncio.sleep(1)
print('Hello')
async def g():
await asyncio.sleep(0.5)
print('Goodbye')
async def main():
t1 = asyncio.ensure_future(repeat(3, f))
t2 = asyncio.ensure_future(repeat(2, g))
await t1
await t2
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Run Code Online (Sandbox Code Playgroud)
小智 10
python 3.7 带有装饰器的替代版本
import asyncio
import time
def periodic(period):
def scheduler(fcn):
async def wrapper(*args, **kwargs):
while True:
asyncio.create_task(fcn(*args, **kwargs))
await asyncio.sleep(period)
return wrapper
return scheduler
@periodic(2)
async def do_something(*args, **kwargs):
await asyncio.sleep(5) # Do some heavy calculation
print(time.time())
if __name__ == '__main__':
asyncio.run(do_something('Maluzinha do papai!', secret=42))
Run Code Online (Sandbox Code Playgroud)
基于@A。Jesse Jiryu Davis 的回答(@Torkel Bj\xc3\xb8rnson-Langen 和 @ReWrite 评论)这是一项避免漂移的改进。
\nimport time\nimport asyncio\n\n@asyncio.coroutine\ndef periodic(period):\n def g_tick():\n t = time.time()\n count = 0\n while True:\n count += 1\n yield max(t + count * period - time.time(), 0)\n g = g_tick()\n\n while True:\n print(\'periodic\', time.time())\n yield from asyncio.sleep(next(g))\n\nloop = asyncio.get_event_loop()\ntask = loop.create_task(periodic(1))\nloop.call_later(5, task.cancel)\n\ntry:\n loop.run_until_complete(task)\nexcept asyncio.CancelledError:\n pass\nRun Code Online (Sandbox Code Playgroud)\n