使用 ThreadPoolExecutor 减少内存占用

Eyp*_*ros 1 python multithreading threadpoolexecutor

我使用它是ThreadPoolExecutor为了下载大量(~400k)关键帧图像。关键帧名称存储在文本文件中(假设keyframes_list.txt)。

我修改了文档中提供的示例,它似乎可以完美地工作,但有一个例外:很明显,该示例将每个链接传递给一个future对象,这些链接全部传递给一个可迭代对象(dict()准确地说)。该可迭代对象作为参数传递给函数as_completed()以检查 a 何时future完成。这当然需要一次性将大量文本加载到内存中。我执行此任务的 python 进程占用了 1GB RAM。

完整代码如下所示:

import concurrent.futures
import requests

def download_keyframe(keyframe_name):
    url = 'http://server/to//Keyframes/{}.jpg'.format(keyframe_name)
    r = requests.get(url, allow_redirects=True)
    open('path/to/be/saved/keyframes/{}.jpg'.format(keyframe_name), 'wb').write(r.content)
    return True

keyframes_list_path = '/path/to/keyframes_list.txt'
future_to_url = {}
with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor:
    with open(keyframes_list_path, 'r') as f:
        for i, line in enumerate(f):
            fields = line.split('\t')
            keyframe_name = fields[0]
            future_to_url[executor.submit(download_keyframe, keyframe_name)] = keyframe_name
    for future in concurrent.futures.as_completed(future_to_url):
        keyframe_name = future_to_url[future]
        try:
            future.result()
        except Exception as exc:
            print('%r generated an exception: %s' % (keyframe_name, exc))
        else:
            print('Keyframe: {} was downloaded.'.format(keyframe_name))
Run Code Online (Sandbox Code Playgroud)

所以,我的问题是如何提供可迭代并保持较低的内存占用。我考虑过使用queue,但不确定它是否能ThreadPoolExecutor顺利配合。有没有简单的方法来控制future提交的数量ThreadPoolExecutor

rob*_*ert 5

AdamKG 的答案是一个好的开始,但他的代码将等到一个块被完全处理后再开始处理下一个块。因此,您会损失一些性能。

我建议采用稍微不同的方法,该方法将向执行器提供连续的任务流,同时强制并行任务的最大数量上限,以保持较低的内存占用量。

诀窍是用来concurrent.futures.wait跟踪已完成的 future 和仍待完成的 future:

def download_keyframe(keyframe_name):
    try:
        url = 'http://server/to//Keyframes/{}.jpg'.format(keyframe_name)
        r = requests.get(url, allow_redirects=True)
        open('path/to/be/saved/keyframes/{}.jpg'.format(keyframe_name), 'wb').write(r.content)
    except Exception as e:
        return keyframe_name, e

    return keyframe_name, None

MAX_WORKERS = 8
with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
    with open(keyframes_list_path, 'r') as fh:
        futures_notdone = set()
        futures_done = set()
        for i, line in enumerate(fh):
            # Submit new task to executor.
            fields = line.split('\t')
            keyframe_name = fields[0]
            futures_notdone.add(executor.submit(download_keyframe, keyframe_name))

            # Enforce upper bound on number of parallel tasks.
            if len(futures_notdone) >= MAX_WORKERS:
                done, futures_notdone = concurrent.futures.wait(futures_notdone, return_when=concurrent.futures.FIRST_COMPLETED)
                futures_done.update(done)

# Process results.
for future in futures_done:
    keyframe_name, exc = future.result()
    if exc:
        print('%r generated an exception: %s' % (keyframe_name, exc))
    else:
        print('Keyframe: {} was downloaded.'.format(keyframe_name))
Run Code Online (Sandbox Code Playgroud)

当然,你也可以定期处理循环内的结果,以便futures_done时不时地清空循环。例如,您可以在每次项目数量超过futures_done1000(或适合您需要的任何其他数量)时执行此操作。如果您的数据集非常大并且结果本身就会导致大量内存使用,这可能会派上用场。