ThreadPoolExecutor:线程(future)完成并产生结果时不释放内存

Ond*_*áma 9 python multithreading memory-leaks python-multithreading

我在从 python 中的分页 API 响应下载大型数据集时遇到内存问题。当我尝试使用 ThreadPoolExecutor 并行下载多个页面时,我注意到已完成和已解决的 future 不会释放其内存占用。

我尝试通过以下两个示例来简化它。第一个使用max_workers设置为 1 的 ThreadPoolExecutor 下载所有页面(据我所知,这应该具有与简单循环相同的内存占用量):

from random import random
from concurrent.futures  import ThreadPoolExecutor, as_completed
import gc

TOTAL_PAGES = 60 

def download_data(page: int = 1) -> list[float]:
    # Send a request to some resource to get data
    print(f"Downloading page {page}.")
    return [random() for _ in range(1000000)] # mock some larga data sets

def threadpool_memory_test(): 
    processed_pages = 0
    with ThreadPoolExecutor(max_workers=1) as executor:
        future_to_page = {
            executor.submit(download_data, page): page for page in range(1, TOTAL_PAGES + 1)
        }
        
        for future in as_completed(future_to_page):
            records = future.result()
            # Do something with the downloaded data..
            processed_pages += 1
            print(f"Downloaded page: {processed_pages} / {TOTAL_PAGES} (number: {future_to_page[future]}) with {len(records)} records.")
            gc.collect() # just to be sure gc is called

if __name__ == "__main__":
    threadpool_memory_test()
Run Code Online (Sandbox Code Playgroud)

但是,当运行此脚本并绘制内存占用量时,它看起来像这样: 线程池的内存占用 即使循环并as_completed获得结果,期货也不会释放内存。

当我以简单循环下载和处理页面时。内存占用量符合预期:

from random import random

TOTAL_PAGES = 60 
def download_data(page: int = 1) -> list[float]:
    # Send a request to some resource to get data
    print(f"Downloading page {page}.")
    return [random() for _ in range(1000000)] # mock some larga data sets

def loop_memory_test():
    for page in range(1, TOTAL_PAGES + 1):
        records = download_data(page)
        # Do something with the downloaded data..
        print(f"Downloaded page: {page} / {TOTAL_PAGES} with {len(records)} records.")

if __name__ == "__main__":
    loop_memory_test()
Run Code Online (Sandbox Code Playgroud)

此类脚本的内存占用: 简单循环的内存占用

有没有办法释放已经获得结果的未来的记忆?

我正在 macOS Monterey 版本 12.5 (21G72) 上测试此功能

Ond*_*áma 5

根据 Stuart 的评论,我更新了脚本,现在它可以按预期工作(速度也快了 10 倍,并且占用了一小部分内存):

from random import random
from concurrent.futures  import ThreadPoolExecutor, as_completed
import gc

TOTAL_PAGES = 60 

def download_data(page: int = 1) -> list[float]:
    # Send a request to some resource to get data
    print(f"Downloading page {page}.")
    return [random() for _ in range(1000000)] # mock some larga data sets

def threadpool_memory_test(): 
    processed_pages = 0
    with ThreadPoolExecutor(max_workers=1) as executor:
        future_to_page = {
            executor.submit(download_data, page): page for page in range(1, TOTAL_PAGES + 1)
        }

        for future in as_completed(future_to_page):
            records = future.result()
            page = future_to_page.pop(future)
            # Do something with the downloaded data..
            processed_pages += 1
            print(f"Downloaded page: {processed_pages} / {TOTAL_PAGES} (number: {page}) with {len(records)} records.")
            gc.collect() # just to be sure gc is called

if __name__ == "__main__":
    threadpool_memory_test()
Run Code Online (Sandbox Code Playgroud)

归结起来就是这一行:

page = future_to_page.pop(future)
Run Code Online (Sandbox Code Playgroud)

确保删除对未来的引用。

现在的内存占用: 更新的线程池测试脚本的内存占用

谢谢你!

  • 斯图尔特评论在哪里? (5认同)
  • 你能发表斯图尔特的评论吗? (2认同)