异步请求退避/节流最佳实践

gbe*_*ven 5 python rest asynchronous python-asyncio aiohttp

场景:我需要从 Web 应用程序的 API 收集分页数据,该 API 的调用限制为每分钟 100 次。我需要返回的 API 对象每页包含 100 个项目,总共有 105 个页面,而且还在不断增加(总共约 10,500 个项目)。同步代码需要大约 15 分钟来检索所有页面,因此那时不必担心达到调用限制。但是,我想加快数据检索速度,因此我使用asyncio和实现了异步调用aiohttp。数据现在在 15 秒内下载 - 很好。

问题:我现在达到了呼叫限制,因此在最近 5 次左右的呼叫中收到 403 错误。

建议的解决方案我实现了try/exceptget_data()函数中找到的。我拨打电话,然后当由于403: Exceeded call limit我退后back_off几秒钟而未成功拨打电话并重试retries多次时:

async def get_data(session, url):
    retries = 3
    back_off = 60  # seconds to try again
    for _ in range(retries):
        try:
            async with session.get(url, headers=headers) as response:
                if response.status != 200:
                    response.raise_for_status()
                print(retries, response.status, url)
                return await response.json()
        except aiohttp.client_exceptions.ClientResponseError as e:
            retries -= 1
            await asyncio.sleep(back_off)
            continue

async def main():
    async with aiohttp.ClientSession() as session:
        attendee_urls = get_urls('attendee') # returns list of URLs to call asynchronously in get_data()
        attendee_data = await asyncio.gather(*[get_data(session, attendee_url) for attendee_url in attendee_urls])
        return attendee_data

if __name__ == '__main__':
    data = asyncio.run(main())
Run Code Online (Sandbox Code Playgroud)

问题:我如何限制 aiohttp 调用,使它们保持在 100 个调用/分钟阈值以下,而不会发出 403 请求退出?我尝试了以下模块,但它们似乎都没有做任何事情:ratelimiter,ratelimitasyncio-throttle.

目标:每分钟进行 100 次异步调用,但在必要时退出并重试(403:超出调用限制)。

Shi*_*iva 1

通过在每个请求之前添加延迟,您可以实现“最多 100 个请求/分钟”。

100 个请求/分钟相当于 1 个请求/0.6 秒。

async def main():

    async with aiohttp.ClientSession() as session:
        attendee_urls = get_urls('attendee') # returns list of URLs to call asynchronously in get_data()
        coroutines = []
        for attendee_url in attendee_urls:
            coroutines.append(get_data(session, attendee_url))
            await asyncio.sleep(0.6)
        attendee_data = asyncio.gather(*coroutines)
        return attendee_data
Run Code Online (Sandbox Code Playgroud)

除了请求速率限制之外,API 通常还会限制数量。同时请求的数量。如果是这样,您可以使用BoundedSempahore

async def main():
    sema = asyncio.BoundedSemaphore(50) # Assuming a concurrent requests limit of 50
...
            coroutines.append(get_data(sema, session, attendee_url))
...

def get_data(sema, session, attendee_url):

...

    for _ in range(retries):
        try:
            async with sema:
                response = await session.get(url, headers=headers):
                if response.status != 200:
                    response.raise_for_status()
...
Run Code Online (Sandbox Code Playgroud)