Python Aiohttp Asyncio:如何在每个任务之间创建延迟

Aar*_*Ahn 4 python api delayed-execution python-asyncio aiohttp

我试图解决的问题: 我正在向服务器发出许多 api 请求。我试图在异步 api 调用之间创建延迟,以遵守服务器的速率限制策略。

我想要它做什么 我希望它的行为如下:

  1. 发出 api 请求 #1
  2. 等待0.1秒
  3. 发出 api 请求 #2
  4. 等待0.1秒...等等...
  5. 重复直到所有请求都提出
  6. 收集响应并在一个对象中返回结果(结果)

问题: 当我在代码中引入asyncio.sleep()time.sleep()时,它仍然几乎立即发出 api 请求。它似乎延迟了print()的执行,但没有延迟 api 请求。我怀疑我必须在循环中创建延迟,而不是在 fetch_one() 或 fetch_all() 处,但不知道如何做到这一点。

代码块:

async def fetch_all(loop, urls, delay): 
    results = await asyncio.gather(*[fetch_one(loop, url, delay) for url in urls], return_exceptions=True)
    return results

async def fetch_one(loop, url, delay):

    #time.sleep(delay)
    #asyncio.sleep(delay)

    async with aiohttp.ClientSession(loop=loop) as session:
        async with session.get(url, ssl=SSLContext()) as resp:
            # print("An api call to ", url, " is made at ", time.time())
            # print(resp)
            return await resp

delay = 0.1
urls = ['some string list of urls']
loop = asyncio.get_event_loop()
loop.run_until_complete(fetch_all(loop, urls, delay))

Versions I'm using: 
python                    3.8.5
aiohttp                   3.7.4
asyncio                   3.4.3
Run Code Online (Sandbox Code Playgroud)

我将不胜感激任何指导我走向正确方向的建议!

jsb*_*eno 5

调用asyncio.gather将“同时”启动所有请求 - 另一方面,如果您只是对每个任务使用锁或等待,那么您根本不会从使用并行性中获得任何好处。

如果您知道可以发出请求的速率,最简单的做法就是在连续的每个请求之前增加异步暂停 - 一个简单的全局变量可以做到这一点:


next_delay = 0.1

async def fetch_all(loop, urls, delay): 
    results = await asyncio.gather(*[fetch_one(loop, url, delay) for url in urls], return_exceptions=True)
    return results

async def fetch_one(loop, url, delay):
    global next_delay
    
    next_delay += delay
    await asyncio.sleep(next_delay)

    async with aiohttp.ClientSession(loop=loop) as session:
        async with session.get(url, ssl=SSLContext()) as resp:
            # print("An api call to ", url, " is made at ", time.time())
            # print(resp)
            return await resp

delay = 0.1
urls = ['some string list of urls']
loop = asyncio.get_event_loop()
loop.run_until_complete(fetch_all(loop, urls, delay))
Run Code Online (Sandbox Code Playgroud)

现在,如果您想要发出 5 个请求,然后发出下一个 5 个请求,您可以使用类似 的同步原语asyncio.Condition,在表达式上使用它wait_for来检查有多少 api 调用处于活动状态:

active_calls = 0

MAX_CALLS = 5

async def fetch_all(loop, urls, delay): 
    event = asyncio.Event()
    event.set()
    results = await asyncio.gather(*[fetch_one(loop, url, delay, event) for url in urls], return_exceptions=True)
    return results

async def fetch_one(loop, url, delay, cond):
    global active_calls
    
    active_calls += 1
    if active_calls > MAX_CALLS:
        event.clear()
        
    await event.wait()
    
    try:
        async with aiohttp.ClientSession(loop=loop) as session:
            async with session.get(url, ssl=SSLContext()) as resp:
                # print("An api call to ", url, " is made at ", time.time())
                # print(resp)
                return await resp
    finally:
        active_calls -= 1
    if active_calls == 0:
        event.set()
        

urls = ['some string list of urls']
loop = asyncio.get_event_loop()
loop.run_until_complete(fetch_all(loop, urls, delay))
Run Code Online (Sandbox Code Playgroud)

对于这两个示例,如果您的任务在设计中避免使用全局变量(实际上,这些是“模块”变量) - 您可以将所有函数移至类中,并在实例上工作,并将全局变量提升为实例属性,或者使用可变容器,例如用于保存第一项中的值的列表active_calls,并将其作为参数传递。