asyncio.gather() 在具有协程字段的 dict 列表中?

ca9*_*3d9 2 python tornado python-asyncio

我有以下两个异步功能

from tornado.httpclient import AsyncHTTPClient

async def get_categories(): # return a list of str
    # ....
    http = AsyncHTTPClient()
    resp = await http.fetch(....)
    return [....]

async def get_details(category): # return a list of dict
    # ....
    http = AsyncHTTPClient()
    resp = await http.fetch(....)
    return [....]
Run Code Online (Sandbox Code Playgroud)

现在我需要创建一个函数来获取所有类别的详细信息(同时运行 http fetch)并将它们组合在一起。

async def get_all_details():
    categories = await get_categories()
    tasks = list(map(lambda x: {'category': x, 'task':get_details(x)}, categories))
    r = await asyncio.gather(*tasks) # error

# need to return [
#   {'category':'aaa', 'detail':'aaa detail 1'}, 
#   {'category':'aaa', 'detail':'aaa detail 2'}, 
#   {'category':'bbb', 'detail':'bbb detail 1'}, 
#   {'category':'bbb', 'detail':'bbb detail 2'}, 
#   {'category':'bbb', 'detail':'bbb detail 3'}, 
#   {'category':'ccc', 'detail':'ccc detail 1'}, 
#   {'category':'ccc', 'detail':'aaa detail 2'}, 
# ]
Run Code Online (Sandbox Code Playgroud)

但是,列表行返回错误:

类型错误:不可散列的类型:'dict'

tasks具有以下值:

[{'category': 'aaa',
  'task': <coroutine object get_docker_list at 0x000001B12B8560C0>},
 {'category': 'bbb',
  'task': <coroutine object get_docker_list at 0x000001B12B856F40>},
 {'category': 'ccc',
  'task': <coroutine object get_docker_list at 0x000001B12B856740>}]
Run Code Online (Sandbox Code Playgroud)

顺便说一句,这是一种限制 http fetch 调用的方法吗?例如,最多同时运行四个提取。

use*_*342 6

gather接受协程(或其他可等待的)参数并以相同的顺序返回其结果的元组。您正在向它传递一系列 dicts,其中一些值是协程。gather不知道如何处理它并尝试将 dicts 视为可等待的对象,这很快就会失败。

生成字典列表的正确方法是将协程传递给gather,并将结果处理成一个新的字典:

async def get_all_details():
    category_list = await get_categories()
    details_list = await asyncio.gather(
        *[get_details(category) for category in category_list]
    )
    return [
        {'category': category, 'details': details}
        for (category, details) in zip(category_list, details_list)
    ]
Run Code Online (Sandbox Code Playgroud)

顺便说一句,这是一种限制 http fetch 调用的方法吗?例如,最多同时运行四个提取。

限制并行调用的方便且惯用的方法是使用信号量

async def get_details(category, limit):
    # acquiring the semaphore passed as `limit` will allow at most a
    # fixed number of coroutines to proceed concurrently
    async with limit:
        ... the rest of the code ...

async def get_all_details():
    limit = asyncio.Semaphore(4)
    category_list = await get_categories()
    details_list = await asyncio.gather(
        *[get_details(category, limit) for category in category_list]
    )
    ... the rest of the code ...
Run Code Online (Sandbox Code Playgroud)