使用asyncio时,如何在关闭事件循环之前完成所有正在运行的任务

der*_*ery 44 python python-3.4 python-asyncio

我有以下代码:

@asyncio.coroutine
def do_something_periodically():
    while True:
        asyncio.async(my_expensive_operation())
        yield from asyncio.sleep(my_interval)
        if shutdown_flag_is_set:
            print("Shutting down")
            break
Run Code Online (Sandbox Code Playgroud)

我运行此功能直到完成.设置关闭时会出现问题 - 该功能完成,任何挂起的任务都不会运行.(你认为这是一个错误

task: <Task pending coro=<report() running at script.py:33> wait_for=<Future pending cb=[Task._wakeup()]>>
Run Code Online (Sandbox Code Playgroud)

).如何正确安排关机?

为了给出一些上下文,我正在编写一个系统监视器,它每隔5秒从/ proc/stat读取一次,计算该时间段内的CPU使用率,然后将结果发送到服务器.我想继续安排这些监视作业,直到我收到sigterm,当我停止调度,等待所有当前作业完成,然后正常退出.

Mar*_*ard 43

您可以检索未完成的任务并再次运行循环直到完成,然后关闭循环或退出程序.

pending = asyncio.Task.all_tasks()
loop.run_until_complete(asyncio.gather(*pending))
Run Code Online (Sandbox Code Playgroud)
  • pending是待处理任务的列表.
  • asyncio.gather()允许一次等待几个任务.

如果你想确保在协程中完成所有任务(也许你有一个"主"协程),你可以这样做,例如:

@asyncio.coroutine
def do_something_periodically():
    while True:
        asyncio.async(my_expensive_operation())
        yield from asyncio.sleep(my_interval)
        if shutdown_flag_is_set:
            print("Shutting down")
            break

    yield from asyncio.gather(*asyncio.Task.all_tasks())
Run Code Online (Sandbox Code Playgroud)

此外,在这种情况下,由于所有任务都是在同一个协同程序中创建的,因此您已经可以访问以下任务:

@asyncio.coroutine
def do_something_periodically():
    tasks = []
    while True:
        tasks.append(asyncio.async(my_expensive_operation()))
        yield from asyncio.sleep(my_interval)
        if shutdown_flag_is_set:
            print("Shutting down")
            break

    yield from asyncio.gather(*tasks)
Run Code Online (Sandbox Code Playgroud)

  • 我可以确认 asyncio 打开供自己使用的唯一文件描述符是选择器和自管道,因此有 3 个文件描述符。Task 对象本身不持有任何资源对象,因此它一定是一个无关的错误。 (4认同)
  • 你的第二个例子不会造成僵局吗?主要任务等待所有其他任务完成,*但它本身就是这些任务之一,*所以这种情况永远不会发生。正确的? (2认同)

Sim*_*Art 33

我注意到一些答案建议使用asyncio.gather(*asyncio.all_tasks()),但问题有时可能是一个无限循环,它等待 完成asyncio.current_task(),这就是它本身。一些答案提出了一些复杂的解决方法,涉及检查coro名称或len(asyncio.all_tasks()),但事实证明,通过利用set操作来做到这一点非常简单:

async def main():
    # Create some tasks.
    for _ in range(10):
        asyncio.create_task(asyncio.sleep(10))
    # Wait for all other tasks to finish other than the current task i.e. main().
    await asyncio.gather(*asyncio.all_tasks() - {asyncio.current_task()})
Run Code Online (Sandbox Code Playgroud)


thr*_*you 9

从Python 3.7开始,上面的答案使用了多个不推荐使用的API(asyncio.async和Task.all_tasks,@ asyncio.coroutine,yield from等),你应该使用它:

import asyncio


async def my_expensive_operation(expense):
    print(await asyncio.sleep(expense, result="Expensive operation finished."))


async def do_something_periodically(expense, interval):
    while True:
        asyncio.create_task(my_expensive_operation(expense))
        await asyncio.sleep(interval)


loop = asyncio.get_event_loop()
coro = do_something_periodically(1, 1)

try:
    loop.run_until_complete(coro)
except KeyboardInterrupt:
    coro.close()
    tasks = asyncio.all_tasks(loop)
    expensive_tasks = {task for task in tasks if task._coro.__name__ != coro.__name__}
    loop.run_until_complete(asyncio.gather(*expensive_tasks))
Run Code Online (Sandbox Code Playgroud)