使用Python线程通过速率限制对速度缓慢的API进行数千次调用

Sea*_*ett 9 python multithreading synchronization python-3.x

我想要对API进行数千次调用,这种调用有点慢 - 几秒钟才能得到响应.唯一的限制是我每秒最多可以提出一个请求.最好的方法是什么?我认为以下代码有效,但我觉得我应该能够以某种方式更好地利用线程库.我正在使用python 3.3

last_job = datetime.now()
for work in work_list:
    while (datetime.now()-last_job).total_seconds() < 1 or threading.active_count() >= max_threads:
        time.sleep(.1)
    threading.Thread(target=work_function, args=[work]).start()
    last_job = datetime.now()
Run Code Online (Sandbox Code Playgroud)

Gar*_*ees 16

如果要使用固定大小的线程池运行一堆作业,可以使用concurrent.futures.ThreadPoolExecutor,如下所示:

from concurrent.futures import ThreadPoolExecutor
with ThreadPoolExecutor(max_workers=5) as executor:
    for work in work_list:
        executor.submit(work_function, work)
Run Code Online (Sandbox Code Playgroud)

如果您想确保每秒最多拨打一次API,那么您需要在内部执行此操作work_function.提交作业时无法执行此操作,因为您不知道作业将等待线程可用的排队时间.

如果是我,我会将速率限制代码放入自己的类中,以便它可以重用:

from collections import Iterator
from threading import Lock
import time

class RateLimiter(Iterator):
    """Iterator that yields a value at most once every 'interval' seconds."""
    def __init__(self, interval):
        self.lock = Lock()
        self.interval = interval
        self.next_yield = 0

    def __next__(self):
        with self.lock:
            t = time.monotonic()
            if t < self.next_yield:
                time.sleep(self.next_yield - t)
                t = time.monotonic()
            self.next_yield = t + self.interval

api_rate_limiter = RateLimiter(1)

def work_function(work):
    next(api_rate_limiter)
    call_api(...)
Run Code Online (Sandbox Code Playgroud)

time.monotonic是在Python 3.3中引入的; 在旧版本的Python中你可以使用,time.time但是当系统时钟发生变化时,这会跳回来,所以你需要确保这不会导致睡眠过长:

                time.sleep(min(self.next_yield - t, self.interval))
Run Code Online (Sandbox Code Playgroud)