Python API 速率限制 - 如何在全局范围内限制 API 调用

gyo*_*oho 5 python api rate-limiting

我正在尝试限制代码中的 API 调用。我已经找到了一个不错的 python 库ratelimiter==1.0.2.post0 https://pypi.python.org/pypi/ratelimiter

但是,这个库只能限制本地范围内的速率。即)在函数和循环中

# Decorator
@RateLimiter(max_calls=10, period=1)
def do_something():
    pass


# Context Manager
rate_limiter = RateLimiter(max_calls=10, period=1)

for i in range(100):
    with rate_limiter:
        do_something()
Run Code Online (Sandbox Code Playgroud)

因为我有几个函数在不同的地方进行 API 调用,所以我想将 API 调用限制在全局范围内。

例如,假设我想将 API 调用限制为每秒一次。并且,假设我有函数x并且y在其中进行了两个 API 调用。

@rate(...)
def x():
   ...

@rate(...)
def y():
   ...
Run Code Online (Sandbox Code Playgroud)

通过用 装饰函数limiter,我可以限制这两个函数的速率。

但是,如果我顺序执行上述两个函数,它会失去对全局范围内API 调用数量的跟踪,因为它们彼此不知道。因此,y将在执行后立即调用,x而无需再等待一秒钟。并且,这将违反每秒一次的限制。

有什么方法或库可以用来在 python 中全局限制速率?

小智 22

我遇到了同样的问题,我有一堆不同的函数调用相同的 API,我想在全球范围内进行速率限制。我最终做的是创建一个启用速率限制的空函数。

PS:我使用此处找到的不同速率限制库: https: //pypi.org/project/ratelimit/

from ratelimit import limits, sleep_and_retry

# 30 calls per minute
CALLS = 30
RATE_LIMIT = 60

@sleep_and_retry
@limits(calls=CALLS, period=RATE_LIMIT)
def check_limit():
''' Empty function just to check for calls to API '''
return
Run Code Online (Sandbox Code Playgroud)

然后我只需在调用 API 的每个函数的开头调用该函数:

def get_something_from_api(http_session, url):
    check_limit()
    response = http_session.get(url)
    return response
Run Code Online (Sandbox Code Playgroud)

如果达到限制,程序将休眠,直到(在我的例子中)60 秒过去,然后正常恢复。

  • 我注意到这个问题要求全局速率限制,但此解决方案仅适用于一个特定进程/线程中的速率限制 (3认同)

gyo*_*oho 6

毕竟,我实现了自己的Throttler类。通过将每个 API 请求代理到该request方法,我们可以跟踪所有 API 请求。利用传递函数作为request方法参数的优势,它还缓存了结果以减少API调用。

class TooManyRequestsError(Exception):
    def __str__(self):
        return "More than 30 requests have been made in the last five seconds."


class Throttler(object):
    cache = {}

    def __init__(self, max_rate, window, throttle_stop=False, cache_age=1800):
        # Dict of max number of requests of the API rate limit for each source
        self.max_rate = max_rate
        # Dict of duration of the API rate limit for each source
        self.window = window
        # Whether to throw an error (when True) if the limit is reached, or wait until another request
        self.throttle_stop = throttle_stop
        # The time, in seconds, for which to cache a response
        self.cache_age = cache_age
        # Initialization
        self.next_reset_at = dict()
        self.num_requests = dict()

        now = datetime.datetime.now()
        for source in self.max_rate:
            self.next_reset_at[source] = now + datetime.timedelta(seconds=self.window.get(source))
            self.num_requests[source] = 0

    def request(self, source, method, do_cache=False):
        now = datetime.datetime.now()

        # if cache exists, no need to make api call
        key = source + method.func_name
        if do_cache and key in self.cache:
            timestamp, data = self.cache.get(key)
            logging.info('{} exists in cached @ {}'.format(key, timestamp))

            if (now - timestamp).seconds < self.cache_age:
                logging.info('retrieved cache for {}'.format(key))
                return data

        # <--- MAKE API CALLS ---> #

        # reset the count if the period passed
        if now > self.next_reset_at.get(source):
            self.num_requests[source] = 0
            self.next_reset_at[source] = now + datetime.timedelta(seconds=self.window.get(source))

        # throttle request
        def halt(wait_time):
            if self.throttle_stop:
                raise TooManyRequestsError()
            else:
                # Wait the required time, plus a bit of extra padding time.
                time.sleep(wait_time + 0.1)

        # if exceed max rate, need to wait
        if self.num_requests.get(source) >= self.max_rate.get(source):
            logging.info('back off: {} until {}'.format(source, self.next_reset_at.get(source)))
            halt((self.next_reset_at.get(source) - now).seconds)

        self.num_requests[source] += 1
        response = method()  # potential exception raise

        # cache the response
        if do_cache:
            self.cache[key] = (now, response)
            logging.info('cached instance for {}, {}'.format(source, method))

        return response
Run Code Online (Sandbox Code Playgroud)