Mus*_*ger 6 python api multiprocessing
我有一个非常简单的代码,如果 id 来自文件,则加载列表,然后迭代列表中的每个 id 并调用 api,在其中传递 id 值并将 api 响应内容转储到文件中。
我想通过并行 api 调用来加快此过程,但是 api 服务器每秒最多允许 5 次调用。另一个关键考虑因素是 api 拉取速度很慢,平均每次调用需要 10 秒才能完成。
我希望能够有多个并行进程,这些进程可以通过某种方式确保每秒最多发生不超过 5 个调用。
这是当前的代码:
import pandas as pd
import numpy as np
from joblib import Parallel, delayed
ids = pd.read_csv('data.csv')
ids = ids['Id'].values.tolist()
def dump_data(df,idx):
filename = base_dir+'\\'+str(idx)+'.csv'
data.to_csv(filename, header= True, index=False) #write data to file
def get_api(idx):
data = call_some_api(idx) #api returns data as pandas dataframe, take about 10 secs
dump_data(df,idx)
Parallel(n_jobs=10, verbose = 50)(delayed(get_api)(idx) for idx in ids)
Run Code Online (Sandbox Code Playgroud)
我目前正在使用 joblib,但如果这个解决方案有更好的库,则可以使用它。
我怎样才能确保在任何给定的时间内发出的请求不会超过 5 个?(同时他尽可能快地完成所有请求)
我还在 Windows 上使用 Python 3.9
更新2
重新思考一下这一点后,根据您的需要使用标准多线程池或多处理池,然后将一个实例传递给工作函数(间接作为全局或显式作为参数)更有意义,该实例的CallingThrottle方法throttle可以由工作函数在需要进行限制的精确处理点(就在向网站发出请求之前)直接调用。将节流实例直接作为参数传递给工作函数应该允许您使用它joblib(但我认为在您的情况下您需要的只是一个多线程池)。
例如:
from multiprocessing.pool import ThreadPool, Pool
from multiprocessing.managers import BaseManager
from threading import Lock
import time
class CallingThrottle:
def __init__(self, nb_call_times_limit, expired_time):
self.nb_call_times_limit = nb_call_times_limit
self.expired_time = expired_time
self.called_timestamps = list()
self.lock = Lock()
def throttle(self):
with self.lock:
while len(self.called_timestamps) == self.nb_call_times_limit:
now = time.time()
self.called_timestamps = list(filter(
lambda x: now - x < self.expired_time,
self.called_timestamps
))
if len(self.called_timestamps) == self.nb_call_times_limit:
time_to_sleep = self.called_timestamps[0] + self.expired_time - now
time.sleep(time_to_sleep)
self.called_timestamps.append(time.time())
# A "managed" CallingThrottle is required for use with multiprocessing:
class CallingThrottleManager(BaseManager):
pass
CallingThrottleManager.register('CallingThrottle', CallingThrottle)
def init_pool(throttle):
global calling_throttle
calling_throttle = throttle
def worker(x):
"""
Emulate a task that takes 10 seconds to execute.
Cannot run more than 5 of these per second.
"""
from datetime import datetime
calling_throttle.throttle()
print(datetime.now(), 'x =', x)
time.sleep(10)
return x, x * x
def main():
# Multithreading example:
calling_throttle = CallingThrottle(5, 1) # 5 calls every 1 second
pool = ThreadPool(20)
init_pool(calling_throttle)
start = time.time()
results = pool.map(worker, range(20))
print('Total elapsed time:', time.time() - start)
pool.close()
pool.join()
print('\n', '-' * 30, '\n', sep='')
# Multiprocessing example:
with CallingThrottleManager() as manager:
calling_throttle = manager.CallingThrottle(5, 1) # 5 calls every 1 second
pool = Pool(20, initializer=init_pool, initargs=(calling_throttle,))
start = time.time()
results = pool.map(worker, range(20))
print('Total elapsed time:', time.time() - start)
pool.close()
pool.join()
if __name__ == '__main__':
main()
Run Code Online (Sandbox Code Playgroud)
使用油门joblib:
import pandas as pd
import numpy as np
from joblib import Parallel, delayed
from multiprocessing.managers import BaseManager
from threading import Lock
import time
class CallingThrottle:
def __init__(self, nb_call_times_limit, expired_time):
self.nb_call_times_limit = nb_call_times_limit
self.expired_time = expired_time
self.called_timestamps = list()
self.lock = Lock()
def throttle(self):
with self.lock:
while len(self.called_timestamps) == self.nb_call_times_limit:
now = time.time()
self.called_timestamps = list(filter(
lambda x: now - x < self.expired_time,
self.called_timestamps
))
if len(self.called_timestamps) == self.nb_call_times_limit:
time_to_sleep = self.called_timestamps[0] + self.expired_time - now
time.sleep(time_to_sleep)
self.called_timestamps.append(time.time())
# A "managed" CallingThrottle is required for use with multiprocessing:
class CallingThrottleManager(BaseManager):
pass
def dump_data(df, idx):
filename = base_dir+'\\'+str(idx)+'.csv'
data.to_csv(filename, header= True, index=False) #write data to file
def get_api(calling_throttle, idx):
calling_throttle.throttle()
data = call_some_api(idx) #api returns data as pandas dataframe, take about 10 secs
dump_data(df, idx)
def main():
ids = pd.read_csv('data.csv')
ids = ids['Id'].values.tolist()
CallingThrottleManager.register('CallingThrottle', CallingThrottle)
with CallingThrottleManager() as manager:
calling_throttle = manager.CallingThrottle()
Parallel(n_jobs=10, verbose = 50)(delayed(get_api)(calling_throttle, idx) for idx in ids)
if __name__ == '__main__':
main()
Run Code Online (Sandbox Code Playgroud)
更新1
我最初实现了@balmy 评论中引用的速率限制算法,并且注意到有时会超出速率。@mindvirus 评论了这种现象,其中 OP 在 8 秒内尝试发送 5 条消息:
这很好,但可能会超出速率。假设您在时间 0 转发 5 条消息,然后在时间 N * (8/5)(N = 1, 2, ...)时,您可以发送另一条消息,从而在 8 秒内发送超过 5 条消息。
所以我现在使用一种新的限速算法。
我创建了两个类,RateLimitedProcessPool分别RateLimitedThreadPool用于多处理和多线程,基于什么是好的速率限制算法?。这些类与标准类mulitprocessing.pool.Pool和标准multiprocessing.pool.ThreadPool类类似,只是这些__init__方法需要两个额外的关键字参数rate和per,它们一起指定该方法每秒apply_async可以调用的最大速率。例如,值rate=7和per=3意味着连续调用将apply_async受到限制,以便仅允许每3 秒7 次调用的最大速率。
以下代码通过一个简单的工作函数演示了这一点,该函数模拟 OP 的情况,其中工作函数需要 10 秒才能执行,并且必须限制在每秒 5 次调用的最大速率。我们需要调用该函数 20 次,因此我们可以获得的最佳性能是大约 13 秒的总运行时间。
import multiprocessing.pool
import multiprocessing
import threading
from functools import wraps
import time
class CallingThrottle:
def __init__(self, nb_call_times_limit, expired_time):
self.nb_call_times_limit = nb_call_times_limit
self.expired_time = expired_time
self.called_timestamps = list()
def __enter__(self):
while len(self.called_timestamps) == self.nb_call_times_limit:
now = time.time()
self.called_timestamps = list(filter(
lambda x: now - x < self.expired_time,
self.called_timestamps
))
if len(self.called_timestamps) == self.nb_call_times_limit:
time_to_sleep = self.called_timestamps[0] + self.expired_time - now
time.sleep(time_to_sleep)
self.called_timestamps.append(time.time())
def __exit__(self, *exc):
pass
class RateLimitedPool:
def __init__(self, rate, per):
self.calling_throttle = CallingThrottle(rate, per)
self.first_time = True
def apply_async(self, *args, **kwargs):
# There could be a lag between the first call to apply_async
# and the first task actually starting, so set the first time
# after the call to apply_async:
if self.first_time:
self.first_time = False
async_result = super().apply_async(*args, **kwargs)
with self.calling_throttle:
pass
return async_result
else:
with self.calling_throttle:
return super().apply_async(*args, **kwargs)
class RateLimitedProcessPool(RateLimitedPool, multiprocessing.pool.Pool):
def __init__(self, *args, rate=5, per=1, **kwargs):
multiprocessing.pool.Pool.__init__(self, *args, **kwargs)
RateLimitedPool.__init__(self, rate, per)
class RateLimitedProcessPool(RateLimitedPool, multiprocessing.pool.Pool):
def __init__(self, *args, rate=5, per=1, **kwargs):
multiprocessing.pool.Pool.__init__(self, *args, **kwargs)
RateLimitedPool.__init__(self, rate, per)
class RateLimitedThreadPool(RateLimitedPool, multiprocessing.pool.ThreadPool):
def __init__(self, *args, rate=5, per=1, **kwargs):
multiprocessing.pool.Pool.__init__(self, *args, **kwargs)
RateLimitedPool.__init__(self, rate, per)
def threadpool(pool):
def decorate(f):
@wraps(f)
def wrap(*args, **kwargs):
return pool.apply_async(f, args, kwargs)
return wrap
return decorate
def processpool(pool):
def decorate(f):
@wraps(f)
def wrap(*args, **kwargs):
return pool.apply_async(f, args, kwargs)
return wrap
return decorate
########################################
def worker(x):
"""
Emulate a task that takes 10 seconds to execute.
Cannot run more than 5 of these per second.
"""
from datetime import datetime
print(datetime.now(), 'x =', x)
time.sleep(10)
return x, x * x
def main():
args = range(20)
pool = RateLimitedThreadPool(20, rate=5, per=1) # 5 per second
start = time.time()
for x in args:
pool.apply_async(worker, args=(x,))
# Wait for all tasks to complete
pool.close()
pool.join()
print('Total elapsed time:', time.time() - start)
if __name__ == '__main__':
main()
Run Code Online (Sandbox Code Playgroud)
印刷:
2021-10-03 07:19:48.002628 x = 0
2021-10-03 07:19:48.002628 x = 1
2021-10-03 07:19:48.002628 x = 3
2021-10-03 07:19:48.002628 x = 4
2021-10-03 07:19:48.002628 x = 2
2021-10-03 07:19:49.005625 x = 5
2021-10-03 07:19:49.005625 x = 6
2021-10-03 07:19:49.005625 x = 8
2021-10-03 07:19:49.005625 x = 7
2021-10-03 07:19:49.005625 x = 9
2021-10-03 07:19:50.008775 x = 10
2021-10-03 07:19:50.008775 x = 11
2021-10-03 07:19:50.008775 x = 13
2021-10-03 07:19:50.008775 x = 12
2021-10-03 07:19:50.008775 x = 14
2021-10-03 07:19:51.012774 x = 15
2021-10-03 07:19:51.012774 x = 16
2021-10-03 07:19:51.012774 x = 17
2021-10-03 07:19:51.012774 x = 18
2021-10-03 07:19:51.012774 x = 19
Total elapsed time: 13.015560150146484
Run Code Online (Sandbox Code Playgroud)
CPU 密集型示例
在下面的示例中,我使用的是 a RateLimitedProcessPool,因为我的工作函数是 100% CPU,在桌面上执行大约需要 10 秒。我只有 8 个逻辑核心(4 个物理核心),因此我的池大小为 8,对于此演示,我以每秒 3 个任务的速率提交 8 个任务。后 3 个任务将在前 3 个任务之后大约 1 秒开始,接下来的 2 个任务将在前 3 个任务之后 1 秒开始。由于物理核心数量成为限制因素,因此总运行时间略高于 21 秒。
import multiprocessing.pool
import multiprocessing
import threading
from functools import wraps
import time
class RateLimitedPool:
# There is an a lag between the first call to apply_async and the first task actually starting:
LAG_TIME = .2 # seconds - needs to be fine-tuned:
def __init__(self, rate, per):
assert isinstance(rate, int) and rate > 0
assert isinstance(per, (int, float)) and per > 0
self.rate = rate
self.per = per
self.count = 0
self.start_time = None
self.first_time = True
def _check_allowed(self):
current_time = time.time()
if self.start_time is None:
self.start_time = current_time
self.count = 1
return True
elapsed_time = current_time - self.start_time
if self.first_time:
elapsed_time -= self.LAG_TIME
if elapsed_time >= self.per:
self.start_time = current_time
self.count = 1
self.first_time = False
return True
if self.count < self.rate:
self.count += 1
return True
return False
def apply_async(self, *args, **kwargs):
while not self._check_allowed():
time.sleep(.1) # This can be fine-tuned
return super().apply_async(*args, **kwargs)
class RateLimitedProcessPool(RateLimitedPool, multiprocessing.pool.Pool):
def __init__(self, *args, rate=5, per=1, **kwargs):
multiprocessing.pool.Pool.__init__(self, *args, **kwargs)
RateLimitedPool.__init__(self, rate, per)
class RateLimitedThreadPool(RateLimitedPool, multiprocessing.pool.ThreadPool):
def __init__(self, *args, rate=5, per=1, **kwargs):
multiprocessing.pool.Pool.__init__(self, *args, **kwargs)
RateLimitedPool.__init__(self, rate, per)
def threadpool(pool):
def decorate(f):
@wraps(f)
def wrap(*args, **kwargs):
return pool.apply_async(f, args, kwargs)
return wrap
return decorate
def processpool(pool):
def decorate(f):
@wraps(f)
def wrap(*args, **kwargs):
return pool.apply_async(f, args, kwargs)
return wrap
return decorate
########################################
ONE_SECOND_ITERATIONS = 20_000_000
def one_second():
sum = 0
for _ in range(ONE_SECOND_ITERATIONS):
sum += 1
return sum
def worker(x):
"""
Emulate a task that takes 10 seconds to execute.
Cannot run more than 3 of these per second.
"""
from datetime import datetime
print(datetime.now(), 'x = ', x)
for _ in range(10):
one_second()
return x, x * x
def main():
args = range(8)
pool = RateLimitedProcessPool(8, rate=3, per=1) # 3 per second
start = time.time()
for x in args:
pool.apply_async(worker, args=(x,))
# Wait for all tasks to complete
pool.close()
pool.join()
print('Total elapsed time:', time.time() - start)
if __name__ == '__main__':
main()
Run Code Online (Sandbox Code Playgroud)
印刷:
2021-10-03 09:51:32.857166 x = 0
2021-10-03 09:51:32.859168 x = 1
2021-10-03 09:51:32.864166 x = 2
2021-10-03 09:51:33.899890 x = 5
2021-10-03 09:51:33.899890 x = 3
2021-10-03 09:51:33.907888 x = 4
2021-10-03 09:51:34.924889 x = 6
2021-10-03 09:51:34.925888 x = 7
Total elapsed time: 21.22123622894287
Run Code Online (Sandbox Code Playgroud)