Bof*_*fin 8 python parallel-processing python-asyncio aiohttp
API通常具有用户必须遵循的速率限制。例如,让我们接受50个请求/秒。顺序请求需要0.5-1秒,因此太慢了,无法接近该限制。但是,使用aiohttp的并行请求超出了速率限制。
为了尽可能快地轮询API,需要对并行调用进行速率限制。
到目前为止,我发现的示例装饰了session.get
,大致像这样:
session.get = rate_limited(max_calls_per_second)(session.get)
Run Code Online (Sandbox Code Playgroud)
这对于顺序调用非常有效。尝试在并行调用中实现此功能无法按预期进行。
这是一些示例代码:
async with aiohttp.ClientSession() as session:
session.get = rate_limited(max_calls_per_second)(session.get)
tasks = (asyncio.ensure_future(download_coroutine(
timeout, session, url)) for url in urls)
process_responses_function(await asyncio.gather(*tasks))
Run Code Online (Sandbox Code Playgroud)
问题在于它将限制任务的排队速度。与的执行gather
将或多或少地同时发生。两全其美;-)。
是的,我在aiohttp处发现了一个类似的问题:设置每秒的最大请求数,但没有答复回答限制请求速率的实际问题。同样,来自Quentin Pradet的博客文章仅在限制队列速率上起作用。
总结一下:如何限制并行请求的每秒aiohttp
请求数?
Jyl*_*pah 10
我通过aiohttp.ClientSession()
基于漏桶算法创建一个带有速率限制器的子类来解决这个问题。我asyncio.Queue()
用于速率限制而不是Semaphores
. 我只覆盖了该_request()
方法。我觉得这种方法更干净,因为你只session = aiohttp.ClientSession()
用session = ThrottledClientSession(rate_limit=15)
.
class ThrottledClientSession(aiohttp.ClientSession):
"""Rate-throttled client session class inherited from aiohttp.ClientSession)"""
MIN_SLEEP = 0.1
def __init__(self, rate_limit: float =None, *args,**kwargs) -> None:
super().__init__(*args,**kwargs)
self.rate_limit = rate_limit
self._fillerTask = None
self._queue = None
self._start_time = time.time()
if rate_limit != None:
if rate_limit <= 0:
raise ValueError('rate_limit must be positive')
self._queue = asyncio.Queue(min(2, int(rate_limit)+1))
self._fillerTask = asyncio.create_task(self._filler(rate_limit))
def _get_sleep(self) -> list:
if self.rate_limit != None:
return max(1/self.rate_limit, self.MIN_SLEEP)
return None
async def close(self) -> None:
"""Close rate-limiter's "bucket filler" task"""
if self._fillerTask != None:
self._fillerTask.cancel()
try:
await asyncio.wait_for(self._fillerTask, timeout= 0.5)
except asyncio.TimeoutError as err:
print(str(err))
await super().close()
async def _filler(self, rate_limit: float = 1):
"""Filler task to fill the leaky bucket algo"""
try:
if self._queue == None:
return
self.rate_limit = rate_limit
sleep = self._get_sleep()
updated_at = time.monotonic()
fraction = 0
extra_increment = 0
for i in range(0,self._queue.maxsize):
self._queue.put_nowait(i)
while True:
if not self._queue.full():
now = time.monotonic()
increment = rate_limit * (now - updated_at)
fraction += increment % 1
extra_increment = fraction // 1
items_2_add = int(min(self._queue.maxsize - self._queue.qsize(), int(increment) + extra_increment))
fraction = fraction % 1
for i in range(0,items_2_add):
self._queue.put_nowait(i)
updated_at = now
await asyncio.sleep(sleep)
except asyncio.CancelledError:
print('Cancelled')
except Exception as err:
print(str(err))
async def _allow(self) -> None:
if self._queue != None:
# debug
#if self._start_time == None:
# self._start_time = time.time()
await self._queue.get()
self._queue.task_done()
return None
async def _request(self, *args,**kwargs):
"""Throttled _request()"""
await self._allow()
return await super()._request(*args,**kwargs)
```
Run Code Online (Sandbox Code Playgroud)
如果我对您的理解很好,是否要限制同时请求的数量?
内部有一个asyncio
名为的对象Semaphore
,它的工作方式类似于异步对象RLock
。
semaphore = asyncio.Semaphore(50)
#...
async def limit_wrap(url):
async with semaphore:
# do what you want
#...
results = asyncio.gather([limit_wrap(url) for url in urls])
Run Code Online (Sandbox Code Playgroud)
假设我发出了50个并发请求,它们都在2秒内完成。因此,它没有达到限制(每秒仅25个请求)。
这意味着我应该发出100个并发请求,它们也都在2秒内完成(每秒50个请求)。但是在您实际提出这些请求之前,您如何确定它们将完成多长时间?
或者如果您不介意每秒完成的请求,而是每秒的请求。您可以:
async def loop_wrap(urls):
for url in urls:
asyncio.ensure_future(download(url))
await asyncio.sleep(1/50)
asyncio.ensure_future(loop_wrap(urls))
loop.run_forever()
Run Code Online (Sandbox Code Playgroud)
上面的代码将Future
每秒创建一个实例1/50
。
归档时间: |
|
查看次数: |
2266 次 |
最近记录: |