asyncio.as_completed是否会产生期货或协同程序?

Bra*_*mon 6 python python-3.x python-asyncio

来自asyncio 文档:

asyncio.as_completed(aws, *, loop=None, timeout=None)

同时在aws集中运行等待对象.返回Future对象的迭代器.返回的每个Future对象表示剩余等待中的最早结果.

我会承担这些的Future对象在描述的方法asyncio.Future:.cancelled(),.exception(),和.result().但似乎所产生的元素只是协程,而不是Future对象.我错过了什么?

这似乎打败了描述.as_completed().如果需要,协程如何"完成" await

>>> import asyncio
>>> import aiohttp
>>> 
>>> async def get(session, url):
...     async with session.request('GET', url) as resp:
...         t = await resp.text()
...         return t
... 
>>> async def bulk_as_completed(urls):
...     async with aiohttp.ClientSession() as session:
...         aws = [get(session, url) for url in urls]
...         for future in asyncio.as_completed(aws):
...             for i in ('cancelled', 'exception', 'result'):
...                 print(hasattr(future, i))
...             print(type(future))
...             try:
...                 result = await future
...             except:
...                 pass
...             else:
...                 print(type(result))
...                 print()
... 
>>> 
>>> urls = (
...     'https://docs.python.org/3/library/asyncio-task.html',
...     'https://docs.python.org/3/library/select.html',
...     'https://docs.python.org/3/library/this-page-will-404.html',
... )
>>> 
>>> asyncio.run(bulk_as_completed(urls))
False
False
False
<class 'coroutine'>
<class 'str'>

False
False
False
<class 'coroutine'>
<class 'str'>

False
False
False
<class 'coroutine'>
<class 'str'>
Run Code Online (Sandbox Code Playgroud)

最终,我关心这个的原因是因为我想让异常像他们一样冒泡asyncio.gather(..., return_exceptions=True). 考虑添加一个伪造的URL,在session.request()调用时会引发:

urls = (
    'https://docs.python.org/3/library/asyncio-task.html',
    'https://docs.python.org/3/library/select.html',
    'https://docs.python.org/3/library/this-page-will-404.html',

    # This URL will raise on session.request().  How can I propagate
    # that exception to the iterator of results?
    'https://asdfasdfasdf-does-not-exist-asdfasdfasdf.com'
)
Run Code Online (Sandbox Code Playgroud)

我会希望能够做的就是这样的事情(使用Future对象的方法,但这些都不是未来的目标可言,这是问题):

async def bulk_as_completed(urls):
    async with aiohttp.ClientSession() as session:
        aws = [get(session, url) for url in urls]
        for future in asyncio.as_completed(aws):
            if future.cancelled():
                res = futures.CancelledError()
            else:
                exc = future.exception()
                if exc is not None:
                    res = exc
                else:
                    res = future.result()
            # ...
            # [Do something with `res`]
Run Code Online (Sandbox Code Playgroud)

use*_*342 8

我希望能够做的是这样的事情[...]

也许不太方便,但您应该能够使用如下代码提取异常:

async def bulk_as_completed(urls):
    async with aiohttp.ClientSession() as session:
        aws = [get(session, url) for url in urls]
        for future in asyncio.as_completed(aws):
            try:
                res = await future
            except Exception as e:
                res = e
            # ...
            # [Do something with `res`]
Run Code Online (Sandbox Code Playgroud)

这[产生协程而不是期货]似乎违背了 的描述.as_completed()。如果我需要等待,协程如何“完成”?

它不是。第一次实现时asyncio.as_completed,异步迭代器还不存在。如果没有异步迭代,就无法在 future 完成时返回它们,因此as_completed可以通过生成(立即)虚拟等待项来伪造它,人们必须等待才能获得实际结果。

即使as_completed产生了实际的 future,它对您的用例也没有帮助,因为如果没有人等待,这些 future 就不会完成。as_completed为了提供生成已完成的future的预期语义,as_completed需要实现异步迭代,其相当于__next__can wait。

令人惊讶的行为as_completed之前已经被提出过,我已经提交了一个问题来通过提供异步迭代来修复它。一旦实现,您的原始代码将仅适用于for更改为async for.