Ond*_*áma 9 python multithreading memory-leaks python-multithreading
我在从 python 中的分页 API 响应下载大型数据集时遇到内存问题。当我尝试使用 ThreadPoolExecutor 并行下载多个页面时,我注意到已完成和已解决的 future 不会释放其内存占用。
我尝试通过以下两个示例来简化它。第一个使用max_workers设置为 1 的 ThreadPoolExecutor 下载所有页面(据我所知,这应该具有与简单循环相同的内存占用量):
from random import random
from concurrent.futures import ThreadPoolExecutor, as_completed
import gc
TOTAL_PAGES = 60
def download_data(page: int = 1) -> list[float]:
# Send a request to some resource to get data
print(f"Downloading page {page}.")
return [random() for _ in range(1000000)] # mock some larga data sets
def threadpool_memory_test():
processed_pages = 0
with ThreadPoolExecutor(max_workers=1) as executor:
future_to_page = {
executor.submit(download_data, page): page for page in range(1, TOTAL_PAGES + 1)
}
for future in as_completed(future_to_page):
records = future.result()
# Do something with the downloaded data..
processed_pages += 1
print(f"Downloaded page: {processed_pages} / {TOTAL_PAGES} (number: {future_to_page[future]}) with {len(records)} records.")
gc.collect() # just to be sure gc is called
if __name__ == "__main__":
threadpool_memory_test()
Run Code Online (Sandbox Code Playgroud)
但是,当运行此脚本并绘制内存占用量时,它看起来像这样:
即使循环并as_completed获得结果,期货也不会释放内存。
当我以简单循环下载和处理页面时。内存占用量符合预期:
from random import random
TOTAL_PAGES = 60
def download_data(page: int = 1) -> list[float]:
# Send a request to some resource to get data
print(f"Downloading page {page}.")
return [random() for _ in range(1000000)] # mock some larga data sets
def loop_memory_test():
for page in range(1, TOTAL_PAGES + 1):
records = download_data(page)
# Do something with the downloaded data..
print(f"Downloaded page: {page} / {TOTAL_PAGES} with {len(records)} records.")
if __name__ == "__main__":
loop_memory_test()
Run Code Online (Sandbox Code Playgroud)
有没有办法释放已经获得结果的未来的记忆?
我正在 macOS Monterey 版本 12.5 (21G72) 上测试此功能
根据 Stuart 的评论,我更新了脚本,现在它可以按预期工作(速度也快了 10 倍,并且占用了一小部分内存):
from random import random
from concurrent.futures import ThreadPoolExecutor, as_completed
import gc
TOTAL_PAGES = 60
def download_data(page: int = 1) -> list[float]:
# Send a request to some resource to get data
print(f"Downloading page {page}.")
return [random() for _ in range(1000000)] # mock some larga data sets
def threadpool_memory_test():
processed_pages = 0
with ThreadPoolExecutor(max_workers=1) as executor:
future_to_page = {
executor.submit(download_data, page): page for page in range(1, TOTAL_PAGES + 1)
}
for future in as_completed(future_to_page):
records = future.result()
page = future_to_page.pop(future)
# Do something with the downloaded data..
processed_pages += 1
print(f"Downloaded page: {processed_pages} / {TOTAL_PAGES} (number: {page}) with {len(records)} records.")
gc.collect() # just to be sure gc is called
if __name__ == "__main__":
threadpool_memory_test()
Run Code Online (Sandbox Code Playgroud)
归结起来就是这一行:
page = future_to_page.pop(future)
Run Code Online (Sandbox Code Playgroud)
确保删除对未来的引用。
谢谢你!
| 归档时间: |
|
| 查看次数: |
4844 次 |
| 最近记录: |