gbe*_*ven 5 python rest asynchronous python-asyncio aiohttp
场景:我需要从 Web 应用程序的 API 收集分页数据,该 API 的调用限制为每分钟 100 次。我需要返回的 API 对象每页包含 100 个项目,总共有 105 个页面,而且还在不断增加(总共约 10,500 个项目)。同步代码需要大约 15 分钟来检索所有页面,因此那时不必担心达到调用限制。但是,我想加快数据检索速度,因此我使用asyncio和实现了异步调用aiohttp。数据现在在 15 秒内下载 - 很好。
问题:我现在达到了呼叫限制,因此在最近 5 次左右的呼叫中收到 403 错误。
建议的解决方案我实现了try/except在get_data()函数中找到的。我拨打电话,然后当由于403: Exceeded call limit我退后back_off几秒钟而未成功拨打电话并重试retries多次时:
async def get_data(session, url):
retries = 3
back_off = 60 # seconds to try again
for _ in range(retries):
try:
async with session.get(url, headers=headers) as response:
if response.status != 200:
response.raise_for_status()
print(retries, response.status, url)
return await response.json()
except aiohttp.client_exceptions.ClientResponseError as e:
retries -= 1
await asyncio.sleep(back_off)
continue
async def main():
async with aiohttp.ClientSession() as session:
attendee_urls = get_urls('attendee') # returns list of URLs to call asynchronously in get_data()
attendee_data = await asyncio.gather(*[get_data(session, attendee_url) for attendee_url in attendee_urls])
return attendee_data
if __name__ == '__main__':
data = asyncio.run(main())
Run Code Online (Sandbox Code Playgroud)
问题:我如何限制 aiohttp 调用,使它们保持在 100 个调用/分钟阈值以下,而不会发出 403 请求退出?我尝试了以下模块,但它们似乎都没有做任何事情:ratelimiter,ratelimit和asyncio-throttle.
目标:每分钟进行 100 次异步调用,但在必要时退出并重试(403:超出调用限制)。
通过在每个请求之前添加延迟,您可以实现“最多 100 个请求/分钟”。
100 个请求/分钟相当于 1 个请求/0.6 秒。
async def main():
async with aiohttp.ClientSession() as session:
attendee_urls = get_urls('attendee') # returns list of URLs to call asynchronously in get_data()
coroutines = []
for attendee_url in attendee_urls:
coroutines.append(get_data(session, attendee_url))
await asyncio.sleep(0.6)
attendee_data = asyncio.gather(*coroutines)
return attendee_data
Run Code Online (Sandbox Code Playgroud)
除了请求速率限制之外,API 通常还会限制数量。同时请求的数量。如果是这样,您可以使用BoundedSempahore。
async def main():
sema = asyncio.BoundedSemaphore(50) # Assuming a concurrent requests limit of 50
...
coroutines.append(get_data(sema, session, attendee_url))
...
def get_data(sema, session, attendee_url):
...
for _ in range(retries):
try:
async with sema:
response = await session.get(url, headers=headers):
if response.status != 200:
response.raise_for_status()
...
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
902 次 |
| 最近记录: |