限速api多进程

Mus*_*ger 6 python api multiprocessing

我有一个非常简单的代码,如果 id 来自文件,则加载列表,然后迭代列表中的每个 id 并调用 api,在其中传递 id 值并将 api 响应内容转储到文件中。

我想通过并行 api 调用来加快此过程,但是 api 服务器每秒最多允许 5 次调用。另一个关键考虑因素是 api 拉取速度很慢,平均每次调用需要 10 秒才能完成。

我希望能够有多个并行进程,这些进程可以通过某种方式确保每秒最多发生不超过 5 个调用。

这是当前的代码:

import pandas as pd
import numpy as np
from joblib import Parallel, delayed

ids = pd.read_csv('data.csv')

ids = ids['Id'].values.tolist()

def dump_data(df,idx):

    filename = base_dir+'\\'+str(idx)+'.csv'
    data.to_csv(filename, header= True, index=False) #write data to file

def get_api(idx):

   data = call_some_api(idx)  #api returns data as pandas dataframe, take about 10 secs
   dump_data(df,idx)


Parallel(n_jobs=10, verbose = 50)(delayed(get_api)(idx) for idx in ids) 
Run Code Online (Sandbox Code Playgroud)

我目前正在使用 joblib,但如果这个解决方案有更好的库,则可以使用它。

我怎样才能确保在任何给定的时间内发出的请求不会超过 5 个?(同时他尽可能快地完成所有请求)

我还在 Windows 上使用 Python 3.9

Boo*_*boo 4

更新2

重新思考一下这一点后,根据您的需要使用标准多线程池或多处理池,然后将一个实例传递给工作函数(间接作为全局或显式作为参数)更有意义,该实例的CallingThrottle方法throttle可以由工作函数在需要进行限制的精确处理点(就在向网站发出请求之前)直接调用。将节流实例直接作为参数传递给工作函数应该允许您使用它joblib(但我认为在您的情况下您需要的只是一个多线程池)。

例如:

from multiprocessing.pool import ThreadPool, Pool
from multiprocessing.managers import BaseManager
from threading import Lock
import time

class CallingThrottle:
    def __init__(self, nb_call_times_limit, expired_time):
        self.nb_call_times_limit = nb_call_times_limit
        self.expired_time = expired_time
        self.called_timestamps = list()
        self.lock = Lock()

    def throttle(self):
        with self.lock:
            while len(self.called_timestamps) == self.nb_call_times_limit:
                now = time.time()
                self.called_timestamps = list(filter(
                    lambda x: now - x < self.expired_time,
                    self.called_timestamps
                ))
                if len(self.called_timestamps) == self.nb_call_times_limit:
                    time_to_sleep = self.called_timestamps[0] + self.expired_time - now
                    time.sleep(time_to_sleep)
            self.called_timestamps.append(time.time())

# A "managed" CallingThrottle is required for use with multiprocessing:
class CallingThrottleManager(BaseManager):
    pass

CallingThrottleManager.register('CallingThrottle', CallingThrottle)

def init_pool(throttle):
    global calling_throttle

    calling_throttle = throttle

def worker(x):
    """
    Emulate a task that takes 10 seconds to execute.
    Cannot run more than 5 of these per second.
    """
    from datetime import datetime
    calling_throttle.throttle()
    print(datetime.now(), 'x =', x)
    time.sleep(10)
    return x, x * x

def main():
    # Multithreading example:
    calling_throttle = CallingThrottle(5, 1) # 5 calls every 1 second
    pool = ThreadPool(20)
    init_pool(calling_throttle)
    start = time.time()
    results = pool.map(worker, range(20))
    print('Total elapsed time:', time.time() - start)
    pool.close()
    pool.join()

    print('\n', '-' * 30, '\n', sep='')

    # Multiprocessing example:
    with CallingThrottleManager() as manager:
        calling_throttle = manager.CallingThrottle(5, 1) # 5 calls every 1 second
        pool = Pool(20, initializer=init_pool, initargs=(calling_throttle,))
        start = time.time()
        results = pool.map(worker, range(20))
        print('Total elapsed time:', time.time() - start)
        pool.close()
        pool.join()

if __name__ == '__main__':
    main()
Run Code Online (Sandbox Code Playgroud)

使用油门joblib

import pandas as pd
import numpy as np
from joblib import Parallel, delayed
from multiprocessing.managers import BaseManager

from threading import Lock
import time

class CallingThrottle:
    def __init__(self, nb_call_times_limit, expired_time):
        self.nb_call_times_limit = nb_call_times_limit
        self.expired_time = expired_time
        self.called_timestamps = list()
        self.lock = Lock()

    def throttle(self):
        with self.lock:
            while len(self.called_timestamps) == self.nb_call_times_limit:
                now = time.time()
                self.called_timestamps = list(filter(
                    lambda x: now - x < self.expired_time,
                    self.called_timestamps
                ))
                if len(self.called_timestamps) == self.nb_call_times_limit:
                    time_to_sleep = self.called_timestamps[0] + self.expired_time - now
                    time.sleep(time_to_sleep)
            self.called_timestamps.append(time.time())

# A "managed" CallingThrottle is required for use with multiprocessing:
class CallingThrottleManager(BaseManager):
    pass

def dump_data(df, idx):
    filename = base_dir+'\\'+str(idx)+'.csv'
    data.to_csv(filename, header= True, index=False) #write data to file

def get_api(calling_throttle, idx):
    calling_throttle.throttle()
    data = call_some_api(idx)  #api returns data as pandas dataframe, take about 10 secs
    dump_data(df, idx)


def main():
    ids = pd.read_csv('data.csv')
    ids = ids['Id'].values.tolist()

    CallingThrottleManager.register('CallingThrottle', CallingThrottle)

    with CallingThrottleManager() as manager:
        calling_throttle = manager.CallingThrottle()
        Parallel(n_jobs=10, verbose = 50)(delayed(get_api)(calling_throttle, idx) for idx in ids)

if __name__ == '__main__':
    main()
Run Code Online (Sandbox Code Playgroud)

更新1

我最初实现了@balmy 评论中引用的速率限制算法,并且注意到有时会超出速率。@mindvirus 评论了这种现象,其中 OP 在 8 秒内尝试发送 5 条消息:

这很好,但可能会超出速率。假设您在时间 0 转发 5 条消息,然后在时间 N * (8/5)(N = 1, 2, ...)时,您可以发送另一条消息,从而在 8 秒内发送超过 5 条消息。

所以我现在使用一种新的限速算法。


我创建了两个类,RateLimitedProcessPool分别RateLimitedThreadPool用于多处理和多线程,基于什么是好的速率限制算法?。这些类与标准类mulitprocessing.pool.Pool和标准multiprocessing.pool.ThreadPool类类似,只是这些__init__方法需要两个额外的关键字参数rateper,它们一起指定该方法每秒apply_async可以调用的最大速率。例如,值rate=7per=3意味着连续调用将apply_async受到限制,以便仅允许每3 秒7 次调用的最大速率。

以下代码通过一个简单的工作函数演示了这一点,该函数模拟 OP 的情况,其中工作函数需要 10 秒才能执行,并且必须限制在每秒 5 次调用的最大速率。我们需要调用该函数 20 次,因此我们可以获得的最佳性能是大约 13 秒的总运行时间。

import multiprocessing.pool
import multiprocessing
import threading
from functools import wraps
import time

class CallingThrottle:
    def __init__(self, nb_call_times_limit, expired_time):
        self.nb_call_times_limit = nb_call_times_limit
        self.expired_time = expired_time
        self.called_timestamps = list()

    def __enter__(self):
        while len(self.called_timestamps) == self.nb_call_times_limit:
            now = time.time()
            self.called_timestamps = list(filter(
                lambda x: now - x < self.expired_time,
                self.called_timestamps
            ))
            if len(self.called_timestamps) == self.nb_call_times_limit:
                time_to_sleep = self.called_timestamps[0] + self.expired_time - now
                time.sleep(time_to_sleep)
        self.called_timestamps.append(time.time())

    def __exit__(self, *exc):
        pass

class RateLimitedPool:
    def __init__(self, rate, per):
        self.calling_throttle = CallingThrottle(rate, per)
        self.first_time = True

    def apply_async(self, *args, **kwargs):
        # There could be a lag between the first call to apply_async
        # and the first task actually starting, so set the first time
        # after the call to apply_async:
        if self.first_time:
            self.first_time = False
            async_result = super().apply_async(*args, **kwargs)
            with self.calling_throttle:
                pass
            return async_result
        else:
            with self.calling_throttle:
                return super().apply_async(*args, **kwargs)

class RateLimitedProcessPool(RateLimitedPool, multiprocessing.pool.Pool):
    def __init__(self, *args, rate=5, per=1, **kwargs):
        multiprocessing.pool.Pool.__init__(self, *args, **kwargs)
        RateLimitedPool.__init__(self, rate, per)

class RateLimitedProcessPool(RateLimitedPool, multiprocessing.pool.Pool):
    def __init__(self, *args, rate=5, per=1, **kwargs):
        multiprocessing.pool.Pool.__init__(self, *args, **kwargs)
        RateLimitedPool.__init__(self, rate, per)

class RateLimitedThreadPool(RateLimitedPool, multiprocessing.pool.ThreadPool):
    def __init__(self, *args, rate=5, per=1, **kwargs):
        multiprocessing.pool.Pool.__init__(self, *args, **kwargs)
        RateLimitedPool.__init__(self, rate, per)

def threadpool(pool):
    def decorate(f):
        @wraps(f)
        def wrap(*args, **kwargs):
            return pool.apply_async(f, args, kwargs)
        return wrap
    return decorate

def processpool(pool):
    def decorate(f):
        @wraps(f)
        def wrap(*args, **kwargs):
            return pool.apply_async(f, args, kwargs)
        return wrap
    return decorate

########################################


def worker(x):
    """
    Emulate a task that takes 10 seconds to execute.
    Cannot run more than 5 of these per second.
    """
    from datetime import datetime
    print(datetime.now(), 'x =', x)
    time.sleep(10)
    return x, x * x

def main():
    args = range(20)
    pool = RateLimitedThreadPool(20, rate=5, per=1) # 5 per second
    start = time.time()
    for x in args:
        pool.apply_async(worker, args=(x,))
    # Wait for all tasks to complete
    pool.close()
    pool.join()
    print('Total elapsed time:', time.time() - start)

if __name__ == '__main__':
    main()
Run Code Online (Sandbox Code Playgroud)

印刷:

2021-10-03 07:19:48.002628 x =  0
2021-10-03 07:19:48.002628 x =  1
2021-10-03 07:19:48.002628 x =  3
2021-10-03 07:19:48.002628 x =  4
2021-10-03 07:19:48.002628 x =  2
2021-10-03 07:19:49.005625 x =  5
2021-10-03 07:19:49.005625 x =  6
2021-10-03 07:19:49.005625 x =  8
2021-10-03 07:19:49.005625 x =  7
2021-10-03 07:19:49.005625 x =  9
2021-10-03 07:19:50.008775 x =  10
2021-10-03 07:19:50.008775 x =  11
2021-10-03 07:19:50.008775 x =  13
2021-10-03 07:19:50.008775 x =  12
2021-10-03 07:19:50.008775 x =  14
2021-10-03 07:19:51.012774 x =  15
2021-10-03 07:19:51.012774 x =  16
2021-10-03 07:19:51.012774 x =  17
2021-10-03 07:19:51.012774 x =  18
2021-10-03 07:19:51.012774 x =  19
Total elapsed time: 13.015560150146484
Run Code Online (Sandbox Code Playgroud)

CPU 密集型示例

在下面的示例中,我使用的是 a RateLimitedProcessPool,因为我的工作函数是 100% CPU,在桌面上执行大约需要 10 秒。我只有 8 个逻辑核心(4 个物理核心),因此我的池大小为 8,对于此演示,我以每秒 3 个任务的速率提交 8 个任务。后 3 个任务将在前 3 个任务之后大约 1 秒开始,接下来的 2 个任务将在前 3 个任务之后 1 秒开始。由于物理核心数量成为限制因素,因此总运行时间略高于 21 秒。

import multiprocessing.pool
import multiprocessing
import threading
from functools import wraps
import time

class RateLimitedPool:
    # There is an a lag between the first call to apply_async and the first task actually starting:
    LAG_TIME = .2 # seconds - needs to be fine-tuned:

    def __init__(self, rate, per):
        assert isinstance(rate, int) and rate > 0
        assert isinstance(per, (int, float)) and per > 0
        self.rate = rate
        self.per = per
        self.count = 0
        self.start_time = None
        self.first_time = True

    def _check_allowed(self):
        current_time = time.time()
        if self.start_time is None:
            self.start_time = current_time
            self.count = 1
            return True
        elapsed_time = current_time - self.start_time
        if self.first_time:
            elapsed_time -= self.LAG_TIME
        if elapsed_time >= self.per:
            self.start_time = current_time
            self.count = 1
            self.first_time = False
            return True
        if self.count < self.rate:
            self.count += 1
            return True
        return False

    def apply_async(self, *args, **kwargs):
        while not self._check_allowed():
            time.sleep(.1) # This can be fine-tuned
        return super().apply_async(*args, **kwargs)

class RateLimitedProcessPool(RateLimitedPool, multiprocessing.pool.Pool):
    def __init__(self, *args, rate=5, per=1, **kwargs):
        multiprocessing.pool.Pool.__init__(self, *args, **kwargs)
        RateLimitedPool.__init__(self, rate, per)

class RateLimitedThreadPool(RateLimitedPool, multiprocessing.pool.ThreadPool):
    def __init__(self, *args, rate=5, per=1, **kwargs):
        multiprocessing.pool.Pool.__init__(self, *args, **kwargs)
        RateLimitedPool.__init__(self, rate, per)

def threadpool(pool):
    def decorate(f):
        @wraps(f)
        def wrap(*args, **kwargs):
            return pool.apply_async(f, args, kwargs)
        return wrap
    return decorate

def processpool(pool):
    def decorate(f):
        @wraps(f)
        def wrap(*args, **kwargs):
            return pool.apply_async(f, args, kwargs)
        return wrap
    return decorate

########################################

ONE_SECOND_ITERATIONS = 20_000_000

def one_second():
    sum = 0
    for _ in range(ONE_SECOND_ITERATIONS):
        sum += 1
    return sum

def worker(x):
    """
    Emulate a task that takes 10 seconds to execute.
    Cannot run more than 3 of these per second.
    """
    from datetime import datetime
    print(datetime.now(), 'x = ', x)
    for _ in range(10):
        one_second()
    return x, x * x

def main():
    args = range(8)
    pool = RateLimitedProcessPool(8, rate=3, per=1) # 3 per second
    start = time.time()
    for x in args:
        pool.apply_async(worker, args=(x,))
    # Wait for all tasks to complete
    pool.close()
    pool.join()
    print('Total elapsed time:', time.time() - start)

if __name__ == '__main__':
    main()
Run Code Online (Sandbox Code Playgroud)

印刷:

2021-10-03 09:51:32.857166 x =  0
2021-10-03 09:51:32.859168 x =  1
2021-10-03 09:51:32.864166 x =  2
2021-10-03 09:51:33.899890 x =  5
2021-10-03 09:51:33.899890 x =  3
2021-10-03 09:51:33.907888 x =  4
2021-10-03 09:51:34.924889 x =  6
2021-10-03 09:51:34.925888 x =  7
Total elapsed time: 21.22123622894287
Run Code Online (Sandbox Code Playgroud)