所以我了解如何通过列表创建异步任务集合并使用 asyncio.gather() 来执行它们,但我不知道如何对字典执行相同操作:
from fastapi import FastAPI
import asyncio
import httpx
app = FastAPI()
urls = [
"http://www.google.com",
"https://www.example.org",
"https://stackoverflow.com/",
"https://www.wikipedio.org"
]
async def async_request_return_dict(url: str):
async with httpx.AsyncClient() as client:
r = await client.get(url)
return {url : r.status_code}
# List of async coroutines
@app.get("/async_list")
async def async_list():
tasks = []
for url in urls:
tasks.append(async_request_return_dict(url=url))
response = await asyncio.gather(*tasks)
# Convert list of dict -> dict
dict_response = dict()
for url in response:
dict_response.update(url)
return dict_response
async def async_request_return_status(url: …Run Code Online (Sandbox Code Playgroud)