aiohttp:速率限制并行请求

Bof*_*fin 8 python parallel-processing python-asyncio aiohttp

API通常具有用户必须遵循的速率限制。例如,让我们接受50个请求/秒。顺序请求需要0.5-1秒,因此太慢了,无法接近该限制。但是,使用aiohttp的并行请求超出了速率限制。

为了尽可能快地轮询API,需要对并行调用进行速率限制。

到目前为止,我发现的示例装饰了session.get,大致像这样:

session.get = rate_limited(max_calls_per_second)(session.get)
Run Code Online (Sandbox Code Playgroud)

这对于顺序调用非常有效。尝试在并行调用中实现此功能无法按预期进行。

这是一些示例代码:

async with aiohttp.ClientSession() as session:
    session.get = rate_limited(max_calls_per_second)(session.get)
    tasks = (asyncio.ensure_future(download_coroutine(  
          timeout, session, url)) for url in urls)
    process_responses_function(await asyncio.gather(*tasks))
Run Code Online (Sandbox Code Playgroud)

问题在于它将限制任务的排队速度。与的执行gather将或多或少地同时发生。两全其美;-)。

是的,我在aiohttp处发现了一个类似的问题:设置每秒的最大请求数,但没有答复回答限制请求速率的实际问题。同样,来自Quentin Pradet的博客文章仅在限制队列速率上起作用。

总结一下:如何限制并行请求的每秒aiohttp请求数?

Jyl*_*pah 10

我通过aiohttp.ClientSession()基于漏桶算法创建一个带有速率限制器的子类来解决这个问题。我asyncio.Queue()用于速率限制而不是Semaphores. 我只覆盖了该_request()方法。我觉得这种方法更干净,因为你只session = aiohttp.ClientSession()session = ThrottledClientSession(rate_limit=15).

class ThrottledClientSession(aiohttp.ClientSession):
        """Rate-throttled client session class inherited from aiohttp.ClientSession)""" 
    MIN_SLEEP = 0.1

    def __init__(self, rate_limit: float =None, *args,**kwargs) -> None: 
        super().__init__(*args,**kwargs)
        self.rate_limit = rate_limit
        self._fillerTask = None
        self._queue = None
        self._start_time = time.time()
        if rate_limit != None:
            if rate_limit <= 0:
                raise ValueError('rate_limit must be positive')
            self._queue = asyncio.Queue(min(2, int(rate_limit)+1))
            self._fillerTask = asyncio.create_task(self._filler(rate_limit))

     
    def _get_sleep(self) -> list:
        if self.rate_limit != None:
            return max(1/self.rate_limit, self.MIN_SLEEP)
        return None
        
    async def close(self) -> None:
        """Close rate-limiter's "bucket filler" task"""
        if self._fillerTask != None:
            self._fillerTask.cancel()
        try:
            await asyncio.wait_for(self._fillerTask, timeout= 0.5)
        except asyncio.TimeoutError as err:
            print(str(err))
        await super().close()


    async def _filler(self, rate_limit: float = 1):
        """Filler task to fill the leaky bucket algo"""
        try:
            if self._queue == None:
                return 
            self.rate_limit = rate_limit
            sleep = self._get_sleep()
            updated_at = time.monotonic()
            fraction = 0
            extra_increment = 0
            for i in range(0,self._queue.maxsize):
                self._queue.put_nowait(i)
            while True:
                if not self._queue.full():
                    now = time.monotonic()
                    increment = rate_limit * (now - updated_at)
                    fraction += increment % 1
                    extra_increment = fraction // 1
                    items_2_add = int(min(self._queue.maxsize - self._queue.qsize(), int(increment) + extra_increment))
                    fraction = fraction % 1
                    for i in range(0,items_2_add):
                        self._queue.put_nowait(i)
                    updated_at = now
                await asyncio.sleep(sleep)
        except asyncio.CancelledError:
            print('Cancelled')
        except Exception as err:
            print(str(err))


    async def _allow(self) -> None:
        if self._queue != None:
            # debug 
            #if self._start_time == None:
            #    self._start_time = time.time()
            await self._queue.get()
            self._queue.task_done()
        return None


    async def _request(self, *args,**kwargs):
        """Throttled _request()"""
        await self._allow()
        return await super()._request(*args,**kwargs)
    ```
Run Code Online (Sandbox Code Playgroud)

  • 其他人同意这个解决方案并编写了一个包https://aiolimiter.readthedocs.io/en/latest/ (2认同)
  • @thatrandomperson,如果您指的是我的 `ThrottledClientSession()`,我已将其移至新的存储库 https://github.com/Jylpah/pyutils。请参阅:https://github.com/Jylpah/pyutils/blob/main/throttledclientsession.py (2认同)

Sra*_*raw 5

如果我对您的理解很好,是否要限制同时请求的数量?

内部有一个asyncio名为的对象Semaphore,它的工作方式类似于异步对象RLock

semaphore = asyncio.Semaphore(50)
#...
async def limit_wrap(url):
    async with semaphore:
        # do what you want
#...
results = asyncio.gather([limit_wrap(url) for url in urls])
Run Code Online (Sandbox Code Playgroud)

更新

假设我发出了50个并发请求,它们都在2秒内完成。因此,它没有达到限制(每秒仅25个请求)。

这意味着我应该发出100个并发请求,它们也都在2秒内完成(每秒50个请求)。但是在您实际提出这些请求之前,您如何确定它们将完成多长时间?

或者如果您不介意每秒完成的请求,而是每秒的请求。您可以:

async def loop_wrap(urls):
    for url in urls:
        asyncio.ensure_future(download(url))
        await asyncio.sleep(1/50)

asyncio.ensure_future(loop_wrap(urls))
loop.run_forever()
Run Code Online (Sandbox Code Playgroud)

上面的代码将Future每秒创建一个实例1/50

  • 不,这是关于限制每秒__requests__的数量,即每秒发送多少次请求。同时请求的数量取决于这些请求花费的时间,但是我们要使用的API对此没有限制。 (3认同)