我的目标是创建一个可用于测量另一个函数的执行和资源使用情况的函数。通过教程,我使用 Python 的 ThreadPoolExecutor 创建了以下内容:
from resource import *
from time import sleep
from concurrent.futures import ThreadPoolExecutor
class MemoryMonitor:
def __init__(self):
self.keep_measuring = True
def measure_usage(self):
max_usage = 0
u_run_time = 0
s_run_time = 0
while self.keep_measuring:
max_usage = max(max_usage, getrusage(RUSAGE_SELF).ru_maxrss)
u_run_time = max(u_run_time, getrusage(RUSAGE_SELF).ru_utime)
s_run_time = max(s_run_time, getrusage(RUSAGE_SELF).ru_stime)
sleep(0.1) # run this loop every 0.1 seconds
return [max_usage, u_run_time, s_run_time]
def execute(function):
with ThreadPoolExecutor() as executor:
monitor = MemoryMonitor()
stats_thread = executor.submit(monitor.measure_usage)
try:
fn_thread = executor.submit(function)
result = fn_thread.result() …Run Code Online (Sandbox Code Playgroud) python encapsulation python-multithreading python-3.x concurrent.futures