Aar*_*Ahn 4 python api delayed-execution python-asyncio aiohttp
我试图解决的问题: 我正在向服务器发出许多 api 请求。我试图在异步 api 调用之间创建延迟,以遵守服务器的速率限制策略。
我想要它做什么 我希望它的行为如下:
问题: 当我在代码中引入asyncio.sleep()或time.sleep()时,它仍然几乎立即发出 api 请求。它似乎延迟了print()的执行,但没有延迟 api 请求。我怀疑我必须在循环中创建延迟,而不是在 fetch_one() 或 fetch_all() 处,但不知道如何做到这一点。
代码块:
async def fetch_all(loop, urls, delay):
results = await asyncio.gather(*[fetch_one(loop, url, delay) for url in urls], return_exceptions=True)
return results
async def fetch_one(loop, url, delay):
#time.sleep(delay)
#asyncio.sleep(delay)
async with aiohttp.ClientSession(loop=loop) as session:
async with session.get(url, ssl=SSLContext()) as resp:
# print("An api call to ", url, " is made at ", time.time())
# print(resp)
return await resp
delay = 0.1
urls = ['some string list of urls']
loop = asyncio.get_event_loop()
loop.run_until_complete(fetch_all(loop, urls, delay))
Versions I'm using:
python 3.8.5
aiohttp 3.7.4
asyncio 3.4.3
Run Code Online (Sandbox Code Playgroud)
我将不胜感激任何指导我走向正确方向的建议!
调用asyncio.gather将“同时”启动所有请求 - 另一方面,如果您只是对每个任务使用锁或等待,那么您根本不会从使用并行性中获得任何好处。
如果您知道可以发出请求的速率,最简单的做法就是在连续的每个请求之前增加异步暂停 - 一个简单的全局变量可以做到这一点:
next_delay = 0.1
async def fetch_all(loop, urls, delay):
results = await asyncio.gather(*[fetch_one(loop, url, delay) for url in urls], return_exceptions=True)
return results
async def fetch_one(loop, url, delay):
global next_delay
next_delay += delay
await asyncio.sleep(next_delay)
async with aiohttp.ClientSession(loop=loop) as session:
async with session.get(url, ssl=SSLContext()) as resp:
# print("An api call to ", url, " is made at ", time.time())
# print(resp)
return await resp
delay = 0.1
urls = ['some string list of urls']
loop = asyncio.get_event_loop()
loop.run_until_complete(fetch_all(loop, urls, delay))
Run Code Online (Sandbox Code Playgroud)
现在,如果您想要发出 5 个请求,然后发出下一个 5 个请求,您可以使用类似 的同步原语asyncio.Condition,在表达式上使用它wait_for来检查有多少 api 调用处于活动状态:
active_calls = 0
MAX_CALLS = 5
async def fetch_all(loop, urls, delay):
event = asyncio.Event()
event.set()
results = await asyncio.gather(*[fetch_one(loop, url, delay, event) for url in urls], return_exceptions=True)
return results
async def fetch_one(loop, url, delay, cond):
global active_calls
active_calls += 1
if active_calls > MAX_CALLS:
event.clear()
await event.wait()
try:
async with aiohttp.ClientSession(loop=loop) as session:
async with session.get(url, ssl=SSLContext()) as resp:
# print("An api call to ", url, " is made at ", time.time())
# print(resp)
return await resp
finally:
active_calls -= 1
if active_calls == 0:
event.set()
urls = ['some string list of urls']
loop = asyncio.get_event_loop()
loop.run_until_complete(fetch_all(loop, urls, delay))
Run Code Online (Sandbox Code Playgroud)
对于这两个示例,如果您的任务在设计中避免使用全局变量(实际上,这些是“模块”变量) - 您可以将所有函数移至类中,并在实例上工作,并将全局变量提升为实例属性,或者使用可变容器,例如用于保存第一项中的值的列表active_calls,并将其作为参数传递。
| 归档时间: |
|
| 查看次数: |
4597 次 |
| 最近记录: |