如何定期使用asyncio执行函数?

2Cu*_*bed 52 python tornado python-3.x python-asyncio python-3.5

我正在迁移tornadoasyncio,而且我找不到asyncio相同tornado的东西PeriodicCallback.(A PeriodicCallback有两个参数:运行的函数和调用之间的毫秒数.)

  • 是否有这样的等价物asyncio
  • 如果没有,那么实现这一目标的最简洁方法是什么,而不会冒RecursionError一会儿的风险?

A. *_*vis 38

对于低于3.5的Python版本:

import asyncio

@asyncio.coroutine
def periodic():
    while True:
        print('periodic')
        yield from asyncio.sleep(1)

def stop():
    task.cancel()

loop = asyncio.get_event_loop()
loop.call_later(5, stop)
task = loop.create_task(periodic())

try:
    loop.run_until_complete(task)
except asyncio.CancelledError:
    pass
Run Code Online (Sandbox Code Playgroud)

对于Python 3.5及更高版本:

import asyncio

async def periodic():
    while True:
        print('periodic')
        await asyncio.sleep(1)

def stop():
    task.cancel()

loop = asyncio.get_event_loop()
loop.call_later(5, stop)
task = loop.create_task(periodic())

try:
    loop.run_until_complete(task)
except asyncio.CancelledError:
    pass
Run Code Online (Sandbox Code Playgroud)

  • 或者你可以像`loop.call_later(5,task.cancel)`那样调用它. (16认同)
  • 快速说明:不要直接创建`Task`实例; 使用`ensure_future()`函数或`AbstractEventLoop.create_task()`方法.来自[asyncio文档](https://docs.python.org/3.5/library/asyncio-task.html#asyncio.Task). (8认同)
  • Python 3.7 的注释:从 [asyncio 文档](https://docs.python.org/3/library/asyncio-task.html#asyncio.Task) 中,我们应该使用高级 `asyncio.create_task ()` 创建`任务`。 (6认同)
  • 即使在Tornado中,对于使用协程的应用程序,我也建议使用这样的循环而不是`PeriodicCallback`. (4认同)

Mik*_*mov 20

当你觉得某些东西应该在你的asyncio程序的"后台"中发生时,asyncio.Task可能是一个很好的方法.您可以阅读此文章以了解如何使用任务.

这里可能实现定期执行某些函数的类:

import asyncio
from contextlib import suppress


class Periodic:
    def __init__(self, func, time):
        self.func = func
        self.time = time
        self.is_started = False
        self._task = None

    async def start(self):
        if not self.is_started:
            self.is_started = True
            # Start task to call func periodically:
            self._task = asyncio.ensure_future(self._run())

    async def stop(self):
        if self.is_started:
            self.is_started = False
            # Stop task and await it stopped:
            self._task.cancel()
            with suppress(asyncio.CancelledError):
                await self._task

    async def _run(self):
        while True:
            await asyncio.sleep(self.time)
            self.func()
Run Code Online (Sandbox Code Playgroud)

我们来测试一下:

async def main():
    p = Periodic(lambda: print('test'), 1)
    try:
        print('Start')
        await p.start()
        await asyncio.sleep(3.1)

        print('Stop')
        await p.stop()
        await asyncio.sleep(3.1)

        print('Start')
        await p.start()
        await asyncio.sleep(3.1)
    finally:
        await p.stop()  # we should stop task finally


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
Run Code Online (Sandbox Code Playgroud)

输出:

Start
test
test
test

Stop

Start
test
test
test

[Finished in 9.5s]
Run Code Online (Sandbox Code Playgroud)

正如你所看到的,start我们只是启动调用某些函数的任务,并在无限循环中休眠一段时间.在stop我们刚刚取消该任务.注意,该任务应该在程序完成时停止.

另一个重要的事情是你的回调不应该花费太多时间来执行(或者它会冻结你的事件循环).如果您计划调用一些长时间运行func,您可能需要在执行程序中运行它.


Mar*_*ers 17

对于定期呼叫没有内置支持,没有.

只需创建自己的调度程序循环即可休眠并执行计划的任何任务:

import math, time

async def scheduler():
    while True:
        # sleep until the next whole second
        now = time.time()
        await asyncio.sleep(math.ceil(now) - now)

        # execute any scheduled tasks
        await for task in scheduled_tasks(time.time()):
            await task()
Run Code Online (Sandbox Code Playgroud)

scheduled_tasks()迭代器应该产生对于准备在给定的时间运行的任务.请注意,制定计划并启动所有任务理论上可能需要超过1秒; 这里的想法是调度程序产生自上次检查以来应该已经开始的所有任务.

  • @ krs013:那是一个*不同的时钟*;它不一定会给您真实的时间(它取决于事件循环的实现,并且可以测量CPU时间滴答声或其他单调增加的时钟量度)。由于不能保证以秒为单位提供度量,因此此处不应该使用它。 (2认同)

小智 14

一个可能有用的变体:如果您希望重复调用每 n 秒发生一次,而不是在上次执行结束和下一次开始之间的 n 秒,并且您不希望调用在时间上重叠,则以下更简单:

async def repeat(interval, func, *args, **kwargs):
    """Run func every interval seconds.

    If func has not finished before *interval*, will run again
    immediately when the previous iteration finished.

    *args and **kwargs are passed as the arguments to func.
    """
    while True:
        await asyncio.gather(
            func(*args, **kwargs),
            asyncio.sleep(interval),
        )
Run Code Online (Sandbox Code Playgroud)

以及使用它在后台运行几个任务的示例:

async def f():
    await asyncio.sleep(1)
    print('Hello')


async def g():
    await asyncio.sleep(0.5)
    print('Goodbye')


async def main():
    t1 = asyncio.ensure_future(repeat(3, f))
    t2 = asyncio.ensure_future(repeat(2, g))
    await t1
    await t2

loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Run Code Online (Sandbox Code Playgroud)

  • 为什么在main()中使用ensure_future?为什么不简单地“等待重复(3,f)”和“等待重复(2,g)”? (2认同)

小智 10

python 3.7 带有装饰器的替代版本

import asyncio
import time


def periodic(period):
    def scheduler(fcn):

        async def wrapper(*args, **kwargs):

            while True:
                asyncio.create_task(fcn(*args, **kwargs))
                await asyncio.sleep(period)

        return wrapper

    return scheduler


@periodic(2)
async def do_something(*args, **kwargs):
    await asyncio.sleep(5)  # Do some heavy calculation
    print(time.time())


if __name__ == '__main__':
    asyncio.run(do_something('Maluzinha do papai!', secret=42))
Run Code Online (Sandbox Code Playgroud)


Woj*_*gda 7

基于@A。Jesse Jiryu Davis 的回答(@Torkel Bj\xc3\xb8rnson-Langen 和 @ReWrite 评论)这是一项避免漂移的改进。

\n
import time\nimport asyncio\n\n@asyncio.coroutine\ndef periodic(period):\n    def g_tick():\n        t = time.time()\n        count = 0\n        while True:\n            count += 1\n            yield max(t + count * period - time.time(), 0)\n    g = g_tick()\n\n    while True:\n        print(\'periodic\', time.time())\n        yield from asyncio.sleep(next(g))\n\nloop = asyncio.get_event_loop()\ntask = loop.create_task(periodic(1))\nloop.call_later(5, task.cancel)\n\ntry:\n    loop.run_until_complete(task)\nexcept asyncio.CancelledError:\n    pass\n
Run Code Online (Sandbox Code Playgroud)\n

  • “periodic”可能应该优先使用“loop.time()”而不是“time.time()”,因为“loop.time()”是“asyncio.sleep()”内部使用的时间引用。`loop.time()` 返回单调时间,而 `time.time()` 返回挂钟时间。例如,当系统管理员修改系统上的日期时,或者当 NTP 调整挂钟时间时,两者会有所不同。 (2认同)