如何在 Python 中使用`async for`?

Pal*_*ron 34 python asynchronous python-asyncio

我的意思是我从使用async for. 这是我用 编写的代码async forAIter(10)可以替换为get_range().

但是代码运行起来像同步而不是异步。

import asyncio

async def get_range():
    for i in range(10):
        print(f"start {i}")
        await asyncio.sleep(1)
        print(f"end {i}")
        yield i

class AIter:
    def __init__(self, N):
        self.i = 0
        self.N = N

    def __aiter__(self):
        return self

    async def __anext__(self):
        i = self.i
        print(f"start {i}")
        await asyncio.sleep(1)
        print(f"end {i}")
        if i >= self.N:
            raise StopAsyncIteration
        self.i += 1
        return i

async def main():
    async for p in AIter(10):
        print(f"finally {p}")

if __name__ == "__main__":
    asyncio.run(main())
Run Code Online (Sandbox Code Playgroud)

我排除的结果应该是:

start 1
start 2
start 3
...
end 1
end 2
...
finally 1
finally 2
...
Run Code Online (Sandbox Code Playgroud)

然而,真正的结果是:

start 0
end 0
finally 0
start 1
end 1
finally 1
start 2
end 2
Run Code Online (Sandbox Code Playgroud)

我知道我可以通过使用asyncio.gatheror获得异常结果asyncio.wait

但是我很难理解async for在这里使用而不是 simple得到了什么for

async for如果我想遍历多个Feature对象并在一个对象完成后立即使用它们,正确的使用方法是什么。例如:

async for f in feature_objects:
    data = await f
    with open("file", "w") as fi:
        fi.write()
Run Code Online (Sandbox Code Playgroud)

use*_*342 79

但是我很难理解async for在这里使用而不是 simple得到了什么for

潜在的误解是期望async for自动并行化迭代。它不这样做,它只是允许对异步源进行顺序迭代。例如,您可以使用async for迭代来自 TCP 流的行、来自 websocket 的消息或来自异步数据库驱动程序的数据库记录。

以上都不适用于普通的for,至少在不阻塞事件循环的情况下不能。这是因为作为阻塞函数for调用__next__并且不等待其结果。您不能手动await获得元素,for因为for期望__next__通过提高 来表示迭代结束StopIteration。如果__next__是协程,则StopIteration异常在等待之前将不可见。这就是为什么async for引入,不只是在Python,而且在其他 语言中使用异步/的await和广义for

如果要并行运行循环迭代,则需要将它们作为并行协程启动,并使用asyncio.as_completed或等效于在它们出现时检索它们的结果:

async def x(i):
    print(f"start {i}")
    await asyncio.sleep(1)
    print(f"end {i}")
    return i

# run x(0)..x(10) concurrently and process results as they arrive
for f in asyncio.as_completed([x(i) for i in range(10)]):
    result = await f
    # ... do something with the result ...
Run Code Online (Sandbox Code Playgroud)

如果您不关心在结果到达时立即对结果做出反应,但您需要所有这些,您可以使用asyncio.gather以下命令使其变得更加简单:

# run x(0)..x(10) concurrently and process results when all are done
results = await asyncio.gather(*[x(i) for i in range(10)])
Run Code Online (Sandbox Code Playgroud)

  • @Roelant你是对的,一个例子会很有用。这个答案试图解决问题中提出的具体观点,这在当时是有意义的,但降低了其作为一般资源的价值。此时添加一个现实生活中的例子将使答案比现在长很多。希望还有其他问题可以澄清这个问题,如果没有,也许是时候提出一个新问题了。 (3认同)
  • 感谢您提供的这个极其简单而精确的解释。 (2认同)
  • 我喜欢解释器,但缺少“async for”循环的示例 (2认同)

mat*_*129 13

(添加已接受的答案 - 为了查理的赏金)。

假设您想同时使用每个产生的值,一个简单的方法是:

import asyncio

async def process_all():
    tasks = []

    async for obj in my_async_generator:
        # Python 3.7+. Use ensure_future for older versions.
        task = asyncio.create_task(process_obj(obj))
        tasks.append(task)
    
    await asyncio.gather(*tasks)


async def process_obj(obj):
    ...
Run Code Online (Sandbox Code Playgroud)

解释:

考虑以下代码,不带create_task

async def process_all():
    async for obj in my_async_generator:
        await process_obj(obj))
Run Code Online (Sandbox Code Playgroud)

这大致相当于:

async def process_all():
    obj1 = await my_async_generator.__anext__():
    await process_obj(obj1))

    obj2 = await my_async_generator.__anext__():
    await process_obj(obj2))
    
    ...
Run Code Online (Sandbox Code Playgroud)

基本上,循环无法继续,因为它的主体被阻塞。正确的方法是将每次迭代的处理委托给一个新的异步任务,该任务将在不阻塞循环的情况下启动。然后,gather等待所有任务 - 这意味着要处理每个迭代。