Asyncio.gather vs asyncio.wait

Cla*_*ude 107 python python-asyncio

asyncio.gather并且asyncio.wait似乎有类似的用途:我有一堆异步的东西,我想要执行/等待(不一定等待一个在下一个开始之前完成).它们使用不同的语法,并且在一些细节上有所不同,但是对于我来说,拥有2个功能上具有如此巨大重叠的功能似乎非常不灵活.我错过了什么?

Udi*_*Udi 132

虽然在一般情况下类似("运行并获得许多任务的结果"),但每个函数对于其他情况都有一些特定的功能:

asyncio.gather()

返回Future实例,允许高级别的任务分组:

import asyncio
from pprint import pprint

import random


async def coro(tag):
    print(">", tag)
    await asyncio.sleep(random.uniform(1, 3))
    print("<", tag)
    return tag


loop = asyncio.get_event_loop()

group1 = asyncio.gather(*[coro("group 1.{}".format(i)) for i in range(1, 6)])
group2 = asyncio.gather(*[coro("group 2.{}".format(i)) for i in range(1, 4)])
group3 = asyncio.gather(*[coro("group 3.{}".format(i)) for i in range(1, 10)])

all_groups = asyncio.gather(group1, group2, group3)

results = loop.run_until_complete(all_groups)

loop.close()

pprint(results)
Run Code Online (Sandbox Code Playgroud)

可以通过调用group2.cancel()甚至取消组中的所有任务all_groups.cancel().另见.gather(..., return_exceptions=True),

asyncio.wait()

支持在第一个任务完成后或在指定的超时后等待停止,从而允许更低级别的操作精度:

import asyncio
import random


async def coro(tag):
    print(">", tag)
    await asyncio.sleep(random.uniform(0.5, 5))
    print("<", tag)
    return tag


loop = asyncio.get_event_loop()

tasks = [coro(i) for i in range(1, 11)]

print("Get first result:")
finished, unfinished = loop.run_until_complete(
    asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED))

for task in finished:
    print(task.result())
print("unfinished:", len(unfinished))

print("Get more results in 2 seconds:")
finished2, unfinished2 = loop.run_until_complete(
    asyncio.wait(unfinished, timeout=2))

for task in finished2:
    print(task.result())
print("unfinished2:", len(unfinished2))

print("Get all other results:")
finished3, unfinished3 = loop.run_until_complete(asyncio.wait(unfinished2))

for task in finished3:
    print(task.result())

loop.close()
Run Code Online (Sandbox Code Playgroud)

  • 在 `asyncio.gather()` 代码中,如果创建这三个组的代码包含在函数体内,则可以摆脱 `loop = asyncio.get_event_loop()` 并重构代码,添加 `await`到 `asyncio.gather(group1, group2, group3)` 使其稍微简单一些,并且不再需要与循环变量相关的所有行 (3认同)
  • “单星号形式(* args)用于传递非关键字的可变长度参数列表,双星号形式用于传递关键字的可变长度参数列表” (2认同)

osp*_*der 33

asyncio.wait比...更低asyncio.gather.

顾名思义,asyncio.gather主要集中在收集结果上.它等待一堆期货并按给定的顺序返回结果.

asyncio.wait等待期货.而不是直接给你结果,它提供完成和待处理的任务.你必须手工收集价值观.

此外,您可以指定等待所有期货完成或仅等待第一个期货wait.

  • @Kingname ..wat (4认同)
  • @ospider 用于 `asyncio.wait` 的名为 `return_when` 的参数已经在 Python 3.5.9 中可用!请参阅此处:https://docs.python.org/3.5/library/asyncio-task.html#asyncio.wait (3认同)
  • 您的意思是 asyncio.gather 必须等待所有这些完成,而 asyncio.wait 将向您返回每个的当前状态(待处理或未处理)?读你的答案我不清楚 (2认同)
  • @EigenFool 从 Python 3.9 开始,`asyncio.wait` 有一个名为 `return_when` 的参数,您可以使用它来控制事件循环何时返回给您。`asyncio.gather` 没有这样的参数,事件循环仅在所有任务完成/失败时才返回给您。在这里阅读官方文档:https://docs.python.org/3/library/asyncio-task.html#asyncio.wait (2认同)
  • @Kingname `python -m timeit "print('hello')"` 每个循环需要 36.6 usec,因此 10000000000000 `print('hello')` 仅需要 11.6 年才能完成 `print()` 函数 (2认同)

Dev*_*wal 24

一个很容易被忽略的非常重要的区别是这两个函数的默认行为,当涉及到异常时。


我将使用这个例子来模拟一个有时会引发异常的协程 -

import asyncio
import random


async def a_flaky_tsk(i):
    await asyncio.sleep(i)  # bit of fuzz to simulate a real-world example

    if i % 2 == 0:
        print(i, "ok")
    else:
        print(i, "crashed!")
        raise ValueError

coros = [a_flaky_tsk(i) for i in range(10)]
Run Code Online (Sandbox Code Playgroud)

await asyncio.gather(*coros) 输出 -

0 ok
1 crashed!
Traceback (most recent call last):
  File "/Users/dev/PycharmProjects/trading/xxx.py", line 20, in <module>
    asyncio.run(main())
  File "/Users/dev/.pyenv/versions/3.8.2/lib/python3.8/asyncio/runners.py", line 43, in run
    return loop.run_until_complete(main)
  File "/Users/dev/.pyenv/versions/3.8.2/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete
    return future.result()
  File "/Users/dev/PycharmProjects/trading/xxx.py", line 17, in main
    await asyncio.gather(*coros)
  File "/Users/dev/PycharmProjects/trading/xxx.py", line 12, in a_flaky_tsk
    raise ValueError
ValueError
Run Code Online (Sandbox Code Playgroud)

如您所见,索引之后的 coros1永远不会执行。


await asyncio.wait(coros)继续执行任务,即使其中一些失败 -

0 ok
1 crashed!
2 ok
3 crashed!
4 ok
5 crashed!
6 ok
7 crashed!
8 ok
9 crashed!
Task exception was never retrieved
future: <Task finished name='Task-10' coro=<a_flaky_tsk() done, defined at /Users/dev/PycharmProjects/trading/xxx.py:6> exception=ValueError()>
Traceback (most recent call last):
  File "/Users/dev/PycharmProjects/trading/xxx.py", line 12, in a_flaky_tsk
    raise ValueError
ValueError
Task exception was never retrieved
future: <Task finished name='Task-8' coro=<a_flaky_tsk() done, defined at /Users/dev/PycharmProjects/trading/xxx.py:6> exception=ValueError()>
Traceback (most recent call last):
  File "/Users/dev/PycharmProjects/trading/xxx.py", line 12, in a_flaky_tsk
    raise ValueError
ValueError
Task exception was never retrieved
future: <Task finished name='Task-2' coro=<a_flaky_tsk() done, defined at /Users/dev/PycharmProjects/trading/xxx.py:6> exception=ValueError()>
Traceback (most recent call last):
  File "/Users/dev/PycharmProjects/trading/xxx.py", line 12, in a_flaky_tsk
    raise ValueError
ValueError
Task exception was never retrieved
future: <Task finished name='Task-9' coro=<a_flaky_tsk() done, defined at /Users/dev/PycharmProjects/trading/xxx.py:6> exception=ValueError()>
Traceback (most recent call last):
  File "/Users/dev/PycharmProjects/trading/xxx.py", line 12, in a_flaky_tsk
    raise ValueError
ValueError
Task exception was never retrieved
future: <Task finished name='Task-3' coro=<a_flaky_tsk() done, defined at /Users/dev/PycharmProjects/trading/xxx.py:6> exception=ValueError()>
Traceback (most recent call last):
  File "/Users/dev/PycharmProjects/trading/xxx.py", line 12, in a_flaky_tsk
    raise ValueError
ValueError
Run Code Online (Sandbox Code Playgroud)

当然,这两种行为都可以通过使用 -

asyncio.gather(..., return_exceptions=True)

或者,

asyncio.wait([...], return_when=asyncio.FIRST_EXCEPTION)


但这并没有结束!

注意: Task exception was never retrieved 在上面的日志中。

asyncio.wait()不会从子任务中重新引发异常,直到您await单独使用它们。(日志中的堆栈跟踪只是消息,无法捕获!)

done, pending = await asyncio.wait(coros)
for tsk in done:
    try:
        await tsk
    except Exception as e:
        print("I caught:", repr(e))
Run Code Online (Sandbox Code Playgroud)

输出 -

0 ok
1 crashed!
2 ok
3 crashed!
4 ok
5 crashed!
6 ok
7 crashed!
8 ok
9 crashed!
I caught: ValueError()
I caught: ValueError()
I caught: ValueError()
I caught: ValueError()
I caught: ValueError()
Run Code Online (Sandbox Code Playgroud)

另一方面,要捕获异常asyncio.gather(),您必须-

results = await asyncio.gather(*coros, return_exceptions=True)
for result_or_exc in results:
    if isinstance(result_or_exc, Exception):
        print("I caught:", repr(result_or_exc))
Run Code Online (Sandbox Code Playgroud)

(与之前相同的输出)

  • 在我看到这篇文章之前,我从来没有理解过“任务异常从未被检索到”错误。非常感谢您的精彩解释.. (13认同)
  • @SauravKumar 我也是!哎呀,这太有帮助了! (2认同)

小智 11

我还注意到你可以通过简单地指定列表在wait()中提供一组协同程序:

result=loop.run_until_complete(asyncio.wait([
        say('first hello', 2),
        say('second hello', 1),
        say('third hello', 4)
    ]))
Run Code Online (Sandbox Code Playgroud)

而只需指定多个协程,就可以在gather()中进行分组:

result=loop.run_until_complete(asyncio.gather(
        say('first hello', 2),
        say('second hello', 1),
        say('third hello', 4)
    ))
Run Code Online (Sandbox Code Playgroud)

  • 列表也可以与`gather()`一起使用,例如:`asyncio.gather(*task_list)` (12认同)
  • @thebeancounter 你不需要立即“等待”!`group = asyncio.gather(*aws)` **直接返回组的可等待/未来**,它代表所有组合任务。这些任务可以在“asyncio.gather”调用之后立即运行,例如,当有“await”等待其他内容时(如“asyncio.sleep”)或访问未来(如“group.done()”)。当您想要确保任务完成或取消并收集所有结果时,您只需要使用“await group”。 (5认同)
  • 发电机也可以 (3认同)
  • 如何在不阻塞脚本其余部分的情况下使用此收集? (2认同)
  • 惊人的。感谢您提供了非常易于阅读的示例。 (2认同)

ale*_*ame 7

除了之前的所有答案之外,我还想谈谈 的不同行为gather()以及wait()它们被取消的情况

集合取消

如果gather()被取消,所有提交的等待(尚未完成)也被取消

等待取消

如果wait()任务被取消,它只会抛出一个CancelledError并且等待的任务保持不变。

简单的例子:

import asyncio


async def task(arg):
    await asyncio.sleep(5)
    return arg


async def cancel_waiting_task(work_task, waiting_task):
    await asyncio.sleep(2)
    waiting_task.cancel()
    try:
        await waiting_task
        print("Waiting done")
    except asyncio.CancelledError:
        print("Waiting task cancelled")

    try:
        res = await work_task
        print(f"Work result: {res}")
    except asyncio.CancelledError:
        print("Work task cancelled")


async def main():
    work_task = asyncio.create_task(task("done"))
    waiting = asyncio.create_task(asyncio.wait({work_task}))
    await cancel_waiting_task(work_task, waiting)

    work_task = asyncio.create_task(task("done"))
    waiting = asyncio.gather(work_task)
    await cancel_waiting_task(work_task, waiting)


asyncio.run(main())
Run Code Online (Sandbox Code Playgroud)

输出:

asyncio.wait()
Waiting task cancelled
Work result: done
----------------
asyncio.gather()
Waiting task cancelled
Work task cancelled
Run Code Online (Sandbox Code Playgroud)

有时需要结合wait()gather()功能。例如,我们要等待至少一个任务完成并在此之后取消其余的挂起任务,如果waiting本身被取消,那么也取消所有挂起的任务。

作为真实的例子,假设我们有一个断开连接事件和一个工作任务。并且我们要等待工作任务的结果,但是如果连接丢失,则取消它。或者,我们将发出多个并行请求,但在完成至少一个响应后,取消所有其他请求。

可以这样做:

import asyncio
from typing import Optional, Tuple, Set


async def wait_any(
        tasks: Set[asyncio.Future], *, timeout: Optional[int] = None,
) -> Tuple[Set[asyncio.Future], Set[asyncio.Future]]:
    tasks_to_cancel: Set[asyncio.Future] = set()
    try:
        done, tasks_to_cancel = await asyncio.wait(
            tasks, timeout=timeout, return_when=asyncio.FIRST_COMPLETED
        )
        return done, tasks_to_cancel
    except asyncio.CancelledError:
        tasks_to_cancel = tasks
        raise
    finally:
        for task in tasks_to_cancel:
            task.cancel()


async def task():
    await asyncio.sleep(5)


async def cancel_waiting_task(work_task, waiting_task):
    await asyncio.sleep(2)
    waiting_task.cancel()
    try:
        await waiting_task
        print("Waiting done")
    except asyncio.CancelledError:
        print("Waiting task cancelled")

    try:
        res = await work_task
        print(f"Work result: {res}")
    except asyncio.CancelledError:
        print("Work task cancelled")


async def check_tasks(waiting_task, working_task, waiting_conn_lost_task):
    try:
        await waiting_task
        print("waiting is done")
    except asyncio.CancelledError:
        print("waiting is cancelled")

    try:
        await waiting_conn_lost_task
        print("connection is lost")
    except asyncio.CancelledError:
        print("waiting connection lost is cancelled")

    try:
        await working_task
        print("work is done")
    except asyncio.CancelledError:
        print("work is cancelled")


async def work_done_case():
    working_task = asyncio.create_task(task())
    connection_lost_event = asyncio.Event()
    waiting_conn_lost_task = asyncio.create_task(connection_lost_event.wait())
    waiting_task = asyncio.create_task(wait_any({working_task, waiting_conn_lost_task}))
    await check_tasks(waiting_task, working_task, waiting_conn_lost_task)


async def conn_lost_case():
    working_task = asyncio.create_task(task())
    connection_lost_event = asyncio.Event()
    waiting_conn_lost_task = asyncio.create_task(connection_lost_event.wait())
    waiting_task = asyncio.create_task(wait_any({working_task, waiting_conn_lost_task}))
    await asyncio.sleep(2)
    connection_lost_event.set()  # <---
    await check_tasks(waiting_task, working_task, waiting_conn_lost_task)


async def cancel_waiting_case():
    working_task = asyncio.create_task(task())
    connection_lost_event = asyncio.Event()
    waiting_conn_lost_task = asyncio.create_task(connection_lost_event.wait())
    waiting_task = asyncio.create_task(wait_any({working_task, waiting_conn_lost_task}))
    await asyncio.sleep(2)
    waiting_task.cancel()  # <---
    await check_tasks(waiting_task, working_task, waiting_conn_lost_task)


async def main():
    print("Work done")
    print("-------------------")
    await work_done_case()
    print("\nConnection lost")
    print("-------------------")
    await conn_lost_case()
    print("\nCancel waiting")
    print("-------------------")
    await cancel_waiting_case()


asyncio.run(main())
Run Code Online (Sandbox Code Playgroud)

输出:

Work done
-------------------
waiting is done
waiting connection lost is cancelled
work is done

Connection lost
-------------------
waiting is done
connection is lost
work is cancelled

Cancel waiting
-------------------
waiting is cancelled
waiting connection lost is cancelled
work is cancelled
Run Code Online (Sandbox Code Playgroud)