Pal*_*ron 34 python asynchronous python-asyncio
我的意思是我从使用async for
. 这是我用 编写的代码async for
,AIter(10)
可以替换为get_range()
.
但是代码运行起来像同步而不是异步。
import asyncio
async def get_range():
for i in range(10):
print(f"start {i}")
await asyncio.sleep(1)
print(f"end {i}")
yield i
class AIter:
def __init__(self, N):
self.i = 0
self.N = N
def __aiter__(self):
return self
async def __anext__(self):
i = self.i
print(f"start {i}")
await asyncio.sleep(1)
print(f"end {i}")
if i >= self.N:
raise StopAsyncIteration
self.i += 1
return i
async def main():
async for p in AIter(10):
print(f"finally {p}")
if __name__ == "__main__":
asyncio.run(main())
Run Code Online (Sandbox Code Playgroud)
我排除的结果应该是:
start 1
start 2
start 3
...
end 1
end 2
...
finally 1
finally 2
...
Run Code Online (Sandbox Code Playgroud)
然而,真正的结果是:
start 0
end 0
finally 0
start 1
end 1
finally 1
start 2
end 2
Run Code Online (Sandbox Code Playgroud)
我知道我可以通过使用asyncio.gather
or获得异常结果asyncio.wait
。
但是我很难理解async for
在这里使用而不是 simple得到了什么for
。
async for
如果我想遍历多个Feature
对象并在一个对象完成后立即使用它们,正确的使用方法是什么。例如:
async for f in feature_objects:
data = await f
with open("file", "w") as fi:
fi.write()
Run Code Online (Sandbox Code Playgroud)
use*_*342 79
但是我很难理解
async for
在这里使用而不是 simple得到了什么for
。
潜在的误解是期望async for
自动并行化迭代。它不这样做,它只是允许对异步源进行顺序迭代。例如,您可以使用async for
迭代来自 TCP 流的行、来自 websocket 的消息或来自异步数据库驱动程序的数据库记录。
以上都不适用于普通的for
,至少在不阻塞事件循环的情况下不能。这是因为作为阻塞函数for
调用__next__
并且不等待其结果。您不能手动await
获得元素,for
因为for
期望__next__
通过提高 来表示迭代结束StopIteration
。如果__next__
是协程,则StopIteration
异常在等待之前将不可见。这就是为什么async for
引入,不只是在Python,而且在其他 语言中使用异步/的await和广义for
。
如果要并行运行循环迭代,则需要将它们作为并行协程启动,并使用asyncio.as_completed
或等效于在它们出现时检索它们的结果:
async def x(i):
print(f"start {i}")
await asyncio.sleep(1)
print(f"end {i}")
return i
# run x(0)..x(10) concurrently and process results as they arrive
for f in asyncio.as_completed([x(i) for i in range(10)]):
result = await f
# ... do something with the result ...
Run Code Online (Sandbox Code Playgroud)
如果您不关心在结果到达时立即对结果做出反应,但您需要所有这些,您可以使用asyncio.gather
以下命令使其变得更加简单:
# run x(0)..x(10) concurrently and process results when all are done
results = await asyncio.gather(*[x(i) for i in range(10)])
Run Code Online (Sandbox Code Playgroud)
mat*_*129 13
(添加已接受的答案 - 为了查理的赏金)。
假设您想同时使用每个产生的值,一个简单的方法是:
import asyncio
async def process_all():
tasks = []
async for obj in my_async_generator:
# Python 3.7+. Use ensure_future for older versions.
task = asyncio.create_task(process_obj(obj))
tasks.append(task)
await asyncio.gather(*tasks)
async def process_obj(obj):
...
Run Code Online (Sandbox Code Playgroud)
考虑以下代码,不带create_task
:
async def process_all():
async for obj in my_async_generator:
await process_obj(obj))
Run Code Online (Sandbox Code Playgroud)
这大致相当于:
async def process_all():
obj1 = await my_async_generator.__anext__():
await process_obj(obj1))
obj2 = await my_async_generator.__anext__():
await process_obj(obj2))
...
Run Code Online (Sandbox Code Playgroud)
基本上,循环无法继续,因为它的主体被阻塞。正确的方法是将每次迭代的处理委托给一个新的异步任务,该任务将在不阻塞循环的情况下启动。然后,gather
等待所有任务 - 这意味着要处理每个迭代。
归档时间: |
|
查看次数: |
23012 次 |
最近记录: |