如何在 Python 的 `asyncio.gather` 中正确处理取消的任务

Max*_*art 5 python python-asyncio

因此asyncio,既然 3.8 已发布,我将再次尝试该模块。但是,在尝试正常关闭事件循环时,我得到了意想不到的结果。具体来说,我正在侦听 a SIGINT,取消正在运行的Tasks,收集这些Tasks,然后.stop()执行事件循环。我知道当它们被取消时Tasks 会引发一个CancelledError,这将传播并结束我的调用,asyncio.gather除非根据文档,我传递return_exceptions=Trueasyncio.gather,这应该导致gather等待所有Tasks 取消并返回一个CancelledErrors数组。但是,如果我尝试取消s ,似乎return_exceptions=True仍然会立即中断我的gather通话。gatherTask

这是重现效果的代码。我正在运行 python 3.8.0:

# demo.py

import asyncio
import random
import signal


async def worker():
    sleep_time = random.random() * 3
    await asyncio.sleep(sleep_time)
    print(f"Slept for {sleep_time} seconds")

async def dispatcher(queue):
    while True:
        await queue.get()
        asyncio.create_task(worker())
        tasks = asyncio.all_tasks()
        print(f"Running Tasks: {len(tasks)}")

async def shutdown(loop):
    tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
    for task in tasks:
        task.cancel()
    print(f"Cancelling {len(tasks)} outstanding tasks")
    results = await asyncio.gather(*tasks, return_exceptions=True)
    print(f"results: {results}")
    loop.stop()

async def main():
    loop = asyncio.get_event_loop()
    loop.add_signal_handler(signal.SIGINT, lambda: asyncio.create_task(shutdown(loop)))
    queue = asyncio.Queue()
    asyncio.create_task(dispatcher(queue))

    while True:
        await queue.put('tick')
        await asyncio.sleep(1)


asyncio.run(main())
Run Code Online (Sandbox Code Playgroud)

输出:

>> python demo.py 
Running Tasks: 3
Slept for 0.3071352174511871 seconds
Running Tasks: 3
Running Tasks: 4
Slept for 0.4152310498820644 seconds
Running Tasks: 4
^CCancelling 4 outstanding tasks
Traceback (most recent call last):
  File "demo.py", line 38, in <module>
    asyncio.run(main())
  File "/Users/max.taggart/.pyenv/versions/3.8.0/lib/python3.8/asyncio/runners.py", line 43, in run
    return loop.run_until_complete(main)
  File "/Users/max.taggart/.pyenv/versions/3.8.0/lib/python3.8/asyncio/base_events.py", line 608, in run_until_complete
    return future.result()
asyncio.exceptions.CancelledError
Run Code Online (Sandbox Code Playgroud)

我猜测事件循环仍有一些我不明白的地方,但我希望所有的CancelledErrors 都作为存储在其中的对象数组返回results,然后能够继续而不是立即看到错误.

Mik*_*mov 7

导致错误的原因是什么?

使用的问题asyncio.all_tasks()是它返回所有任务,即使是那些你没有直接创建的任务。按照以下方式更改您的代码以查看您取消的内容:

for task in tasks:
    print(task)
    task.cancel()
Run Code Online (Sandbox Code Playgroud)

您不仅会看到worker相关任务,还会看到:

<Task pending coro=<main() running at ...>
Run Code Online (Sandbox Code Playgroud)

取消main会导致内部混乱,asyncio.run(main())并且会出错。让我们进行快速/脏修改以从取消中排除此任务:

tasks = [
    t 
    for t 
    in asyncio.all_tasks() 
    if (
        t is not asyncio.current_task()
        and t._coro.__name__ != 'main'
    )
]

for task in tasks:
    print(task)
    task.cancel()
Run Code Online (Sandbox Code Playgroud)

现在你会看到你的results.

loop.stop() 导致错误

当你实现时results,你会得到另一个错误Event loop stopped before Future completed。它发生是因为asyncio.run(main())想要运行直到main()完成。

您必须重构您的代码以允许asyncio.run完成您传入的协程,而不是停止事件循环,或者,例如,使用loop.run_forever()而不是asyncio.run.

这是我的意思的快速/肮脏演示:

async def shutdown(loop):
    # ...

    global _stopping
    _stopping = True
    # loop.stop()

_stopping = False

async def main():
    # ...

    while not _stopping:
        await queue.put('tick')
        await asyncio.sleep(1)
Run Code Online (Sandbox Code Playgroud)

现在您的代码将正常工作而不会出错。不要在实践中使用上面的代码,这只是一个例子。尝试按照我上面提到的方式重构您的代码。

如何正确处理任务

不要使用asyncio.all_tasks().

如果您创建了一些要在将来取消的任务,请存储它并仅取消存储的任务。伪代码:

i_created = []

# ...

task = asyncio.create_task(worker())
i_created.append(task)

# ...

for task in i_created:
    task.cancel()
Run Code Online (Sandbox Code Playgroud)

这可能看起来不方便,但它是一种确保您不会取消不想被取消的东西的方法。

还有一件事

还需要注意的是asyncio.run()更多的不仅仅是启动事件循环。特别是,它会在完成之前取消所有挂起的任务。在某些情况下它可能很有用,但我建议改为手动处理所有取消。