我需要处理来自服务器的几页数据。我想为它制作一个这样的发电机。不幸的是我得到TypeError: 'async_generator' object is not iterable
async def get_data():
i = 0
while i < 3:
i += 1
data = await http_call() # call to http-server here
yield data
data = [i for i in get_data()] # inside a loop
Run Code Online (Sandbox Code Playgroud)
下一个变体加注 TypeError: object async_generator can't be used in 'await' expression
data = [i for i in await get_data()] # inside a loop
Run Code Online (Sandbox Code Playgroud) 我已经熟悉Python,但我刚刚开始学习“异步编程”和“io任务”主题。在教程中,我看到它asyncio.Queue()与生产者和消费者相关的东西一起使用,但我不太理解它。
有人可以简化一下,并举几个例子吗?
我有以下场景:
aiohttp)并将调用结果保存到 Mongo(使用motor)。所以有很多 IO 发生。该代码是使用async/编写的await,对于手动执行的单个调用来说效果很好。
我不知道该怎么做是批量使用输入数据。
asyncio我见过的所有示例都是asyncio.wait通过发送有限列表作为参数来演示的。但我不能简单地向其发送任务列表,因为输入文件可能有数百万行。
我的场景是通过传送带将数据流向消费者。
我还可以做些什么?我希望程序使用它可以聚集的所有资源来处理文件中的数据,但又不会被淹没。
我使用这个答案中的代码,但是asyncio.exceptions.CancelledError当队列为空时得到。在实际项目中,我将任务添加到消费者的队列中,这就是我使用while True语句的原因
我压缩该代码以使调试更容易:
import asyncio
import traceback
async def consumer(queue: asyncio.Queue):
try:
while True:
number = await queue.get() # here is exception
queue.task_done()
print(f'consumed {number}')
except BaseException:
traceback.print_exc()
async def main():
queue = asyncio.Queue()
for i in range(3):
await queue.put(i)
consumers = [asyncio.create_task(consumer(queue)) for _ in range(1)]
await queue.join()
for c in consumers:
c.cancel()
asyncio.run(main())
Run Code Online (Sandbox Code Playgroud)
和错误:
consumed 0
consumed 1
consumed 2
Traceback (most recent call last):
File "/Users/abionics/Downloads/BaseAsyncScraper/ttt.py", line 8, in consumer
number …Run Code Online (Sandbox Code Playgroud)