Eyp*_*ros 1 python multithreading threadpoolexecutor
我使用它是ThreadPoolExecutor为了下载大量(~400k)关键帧图像。关键帧名称存储在文本文件中(假设keyframes_list.txt)。
我修改了文档中提供的示例,它似乎可以完美地工作,但有一个例外:很明显,该示例将每个链接传递给一个future对象,这些链接全部传递给一个可迭代对象(dict()准确地说)。该可迭代对象作为参数传递给函数as_completed()以检查 a 何时future完成。这当然需要一次性将大量文本加载到内存中。我执行此任务的 python 进程占用了 1GB RAM。
完整代码如下所示:
import concurrent.futures
import requests
def download_keyframe(keyframe_name):
url = 'http://server/to//Keyframes/{}.jpg'.format(keyframe_name)
r = requests.get(url, allow_redirects=True)
open('path/to/be/saved/keyframes/{}.jpg'.format(keyframe_name), 'wb').write(r.content)
return True
keyframes_list_path = '/path/to/keyframes_list.txt'
future_to_url = {}
with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor:
with open(keyframes_list_path, 'r') as f:
for i, line in enumerate(f):
fields = line.split('\t')
keyframe_name = fields[0]
future_to_url[executor.submit(download_keyframe, keyframe_name)] = keyframe_name
for future in concurrent.futures.as_completed(future_to_url):
keyframe_name = future_to_url[future]
try:
future.result()
except Exception as exc:
print('%r generated an exception: %s' % (keyframe_name, exc))
else:
print('Keyframe: {} was downloaded.'.format(keyframe_name))
Run Code Online (Sandbox Code Playgroud)
所以,我的问题是如何提供可迭代并保持较低的内存占用。我考虑过使用queue,但不确定它是否能ThreadPoolExecutor顺利配合。有没有简单的方法来控制future提交的数量ThreadPoolExecutor?
AdamKG 的答案是一个好的开始,但他的代码将等到一个块被完全处理后再开始处理下一个块。因此,您会损失一些性能。
我建议采用稍微不同的方法,该方法将向执行器提供连续的任务流,同时强制并行任务的最大数量上限,以保持较低的内存占用量。
诀窍是用来concurrent.futures.wait跟踪已完成的 future 和仍待完成的 future:
def download_keyframe(keyframe_name):
try:
url = 'http://server/to//Keyframes/{}.jpg'.format(keyframe_name)
r = requests.get(url, allow_redirects=True)
open('path/to/be/saved/keyframes/{}.jpg'.format(keyframe_name), 'wb').write(r.content)
except Exception as e:
return keyframe_name, e
return keyframe_name, None
MAX_WORKERS = 8
with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
with open(keyframes_list_path, 'r') as fh:
futures_notdone = set()
futures_done = set()
for i, line in enumerate(fh):
# Submit new task to executor.
fields = line.split('\t')
keyframe_name = fields[0]
futures_notdone.add(executor.submit(download_keyframe, keyframe_name))
# Enforce upper bound on number of parallel tasks.
if len(futures_notdone) >= MAX_WORKERS:
done, futures_notdone = concurrent.futures.wait(futures_notdone, return_when=concurrent.futures.FIRST_COMPLETED)
futures_done.update(done)
# Process results.
for future in futures_done:
keyframe_name, exc = future.result()
if exc:
print('%r generated an exception: %s' % (keyframe_name, exc))
else:
print('Keyframe: {} was downloaded.'.format(keyframe_name))
Run Code Online (Sandbox Code Playgroud)
当然,你也可以定期处理循环内的结果,以便futures_done时不时地清空循环。例如,您可以在每次项目数量超过futures_done1000(或适合您需要的任何其他数量)时执行此操作。如果您的数据集非常大并且结果本身就会导致大量内存使用,这可能会派上用场。