gyo*_*oho 5 python api rate-limiting
我正在尝试限制代码中的 API 调用。我已经找到了一个不错的 python 库ratelimiter==1.0.2.post0
https://pypi.python.org/pypi/ratelimiter
但是,这个库只能限制本地范围内的速率。即)在函数和循环中
# Decorator
@RateLimiter(max_calls=10, period=1)
def do_something():
pass
# Context Manager
rate_limiter = RateLimiter(max_calls=10, period=1)
for i in range(100):
with rate_limiter:
do_something()
Run Code Online (Sandbox Code Playgroud)
因为我有几个函数在不同的地方进行 API 调用,所以我想将 API 调用限制在全局范围内。
例如,假设我想将 API 调用限制为每秒一次。并且,假设我有函数x并且y在其中进行了两个 API 调用。
@rate(...)
def x():
...
@rate(...)
def y():
...
Run Code Online (Sandbox Code Playgroud)
通过用 装饰函数limiter,我可以限制这两个函数的速率。
但是,如果我顺序执行上述两个函数,它会失去对全局范围内API 调用数量的跟踪,因为它们彼此不知道。因此,y将在执行后立即调用,x而无需再等待一秒钟。并且,这将违反每秒一次的限制。
有什么方法或库可以用来在 python 中全局限制速率?
小智 22
我遇到了同样的问题,我有一堆不同的函数调用相同的 API,我想在全球范围内进行速率限制。我最终做的是创建一个启用速率限制的空函数。
PS:我使用此处找到的不同速率限制库: https: //pypi.org/project/ratelimit/
from ratelimit import limits, sleep_and_retry
# 30 calls per minute
CALLS = 30
RATE_LIMIT = 60
@sleep_and_retry
@limits(calls=CALLS, period=RATE_LIMIT)
def check_limit():
''' Empty function just to check for calls to API '''
return
Run Code Online (Sandbox Code Playgroud)
然后我只需在调用 API 的每个函数的开头调用该函数:
def get_something_from_api(http_session, url):
check_limit()
response = http_session.get(url)
return response
Run Code Online (Sandbox Code Playgroud)
如果达到限制,程序将休眠,直到(在我的例子中)60 秒过去,然后正常恢复。
毕竟,我实现了自己的Throttler类。通过将每个 API 请求代理到该request方法,我们可以跟踪所有 API 请求。利用传递函数作为request方法参数的优势,它还缓存了结果以减少API调用。
class TooManyRequestsError(Exception):
def __str__(self):
return "More than 30 requests have been made in the last five seconds."
class Throttler(object):
cache = {}
def __init__(self, max_rate, window, throttle_stop=False, cache_age=1800):
# Dict of max number of requests of the API rate limit for each source
self.max_rate = max_rate
# Dict of duration of the API rate limit for each source
self.window = window
# Whether to throw an error (when True) if the limit is reached, or wait until another request
self.throttle_stop = throttle_stop
# The time, in seconds, for which to cache a response
self.cache_age = cache_age
# Initialization
self.next_reset_at = dict()
self.num_requests = dict()
now = datetime.datetime.now()
for source in self.max_rate:
self.next_reset_at[source] = now + datetime.timedelta(seconds=self.window.get(source))
self.num_requests[source] = 0
def request(self, source, method, do_cache=False):
now = datetime.datetime.now()
# if cache exists, no need to make api call
key = source + method.func_name
if do_cache and key in self.cache:
timestamp, data = self.cache.get(key)
logging.info('{} exists in cached @ {}'.format(key, timestamp))
if (now - timestamp).seconds < self.cache_age:
logging.info('retrieved cache for {}'.format(key))
return data
# <--- MAKE API CALLS ---> #
# reset the count if the period passed
if now > self.next_reset_at.get(source):
self.num_requests[source] = 0
self.next_reset_at[source] = now + datetime.timedelta(seconds=self.window.get(source))
# throttle request
def halt(wait_time):
if self.throttle_stop:
raise TooManyRequestsError()
else:
# Wait the required time, plus a bit of extra padding time.
time.sleep(wait_time + 0.1)
# if exceed max rate, need to wait
if self.num_requests.get(source) >= self.max_rate.get(source):
logging.info('back off: {} until {}'.format(source, self.next_reset_at.get(source)))
halt((self.next_reset_at.get(source) - now).seconds)
self.num_requests[source] += 1
response = method() # potential exception raise
# cache the response
if do_cache:
self.cache[key] = (now, response)
logging.info('cached instance for {}, {}'.format(source, method))
return response
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
15750 次 |
| 最近记录: |