jli*_*ski 5 python memory multithreading subprocess concurrent.futures
我有一个小项目(请记住我只是一个Python初学者)。该项目由几个较小的 .py 文件组成。首先有一个main.py看起来像这样的:
from Controller import Controller
import config as cfg
if __name__ == "__main__":
for path in cfg.paths.values():
if not os.path.exists(path):
os.system(f"mkdir {path} -p")
Con = Controller()
Con.start()
Run Code Online (Sandbox Code Playgroud)
所以这个程序只是创建一些目录,创建控制器对象并运行它的方法。Controller.py看起来像这样:
import multiprocessing
import watchdog
import watchdog.events
import watchdog.observers
import watchdog.utils.dirsnapshot
import concurrent.futures
from AM import AM
import config as cfg
m = multiprocessing.Manager()
q = m.Queue()
class Handler(watchdog.events.PatternMatchingEventHandler):
def __init__(self):
# Set the patterns for PatternMatchingEventHandler
watchdog.events.PatternMatchingEventHandler.__init__(self, patterns=['TEST*'],
ignore_directories=True, case_sensitive=False)
def on_created(self, event):
logging.info("AM Watchdog received created event - % s." % event.src_path)
q.put(event.src_path)
def on_moved(self, event):
logging.info("AM Watchdog received modified event - % s." % event.src_path)
q.put(event.src_path)
class Controller:
def __init__(self):
pass
def _start_func(self, newFname):
try:
res = AM(f"{newFname}").start()
return res
except:
return 1
def start(self):
event_handler = Handler()
observer = watchdog.observers.Observer()
observer.schedule(event_handler, path=cfg.paths["ipath"], recursive=True)
observer.start()
try:
while True:
time.sleep(1)
with concurrent.futures.ThreadPoolExecutor(max_workers=cfg.workers) as executor:
futures = {}
while not q.empty():
newFname = q.get()
futures_to_work = executor.submit(self._start_func, newFname)
futures[futures_to_work] = newFname
for future in concurrent.futures.as_completed(futures):
name = futures.pop(future)
print(f"{name} completed")
except KeyboardInterrupt:
observer.stop()
observer.join()
Run Code Online (Sandbox Code Playgroud)
这个程序比上一个程序更复杂(并且可能有一些问题)。其目的是观察目录(cfg.paths["ipath"])并等待TEST*文件出现。当它完成时,它的名称将被添加到队列中。当队列不为空时,将concurrent.futures.ThreadPoolExecutor创建一个新的 future,并将名称传递给_start_func方法。此方法从 AM.py 创建一个新对象并运行它。其背后的思考过程是,我想要一个等待TEST*文件出现然后对其进行一些操作的程序,同时能够同时处理多个文件并按照它们出现的顺序对其进行处理。
AM.py看起来像这样:
import subprocess
class AM():
def __init__(self, fname):
pass
def test_func(self, fname):
c = f"some_faulty_unix_program {fname}".split(" ")
p = subprocess.run(c, capture_output=True, text = True)
out, err = p.stdout, p.stderr
if out:
print(out)
if err:
print(err)
return 1
return 0
def start(self, fname):
res = self.test_func(fname)
return res
Run Code Online (Sandbox Code Playgroud)
该程序正在新进程中运行一些unix程序(在检测到的文件上Controller.py)。该程序经常会产生错误(由于TEST*文件并不总是有效)。我认为这个程序是什么并不重要,但以防万一这个程序来自solve-field并且astrometry.net文件TEST*是天空的图像。
整个项目作为服务运行,如下所示:
[Unit]
Description = test astrometry service
After = network.target
[Service]
Type = simple
ExecStart = /bin/bash -c "/home/project/main.py"
Restart = always
RestartSec = 2
TimeoutStartSec = infinity
User = root
Group = users
PrivateTmp = true
Environment = "PATH=/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/home/project"
[Install]
WantedBy = multi-user.target
Run Code Online (Sandbox Code Playgroud)
当我启用此服务并检查它时,systemctl status my_project.service它大约需要 76.0M 内存。我倾向于让这个工作整个晚上(我有一个每分钟拍摄一张夜空照片的系统,这个项目是为了计算这张夜空照片的天体测量)。第二天早上,当我测试systemctl status内存时,如果没有错误,则约为 200-300M;如果出现问题,则约为 3.5G(例如,我移动了此 UNIX 程序使用的配置文件,因此它在开始时会产生错误) )。为什么记忆力会这样增加?是我的代码有问题导致的,还是这个unix程序有问题?
我不清楚内存泄漏发生在哪里。如果它在“some_faulty_unix_program”中运行AM.test_func,那么您需要找到或创建它的替代品。但我相信可以对代码进行一些简化/优化,以减少在其他地方发生内存泄漏的可能性。
首先,我认为您不需要一遍又一遍地重新创建多线程池。它还似乎watchdog使用多线程,因此您可以使用更高效的queue.Queue实例而不是托管队列。但最终我认为,通过对Controller.py代码进行一些重构,以便您的处理程序将任务提交到多线程池,您可以完全消除显式队列。下面的工作会起作用吗?
import concurrent.futures
from threading import Event
import watchdog
import watchdog.events
import watchdog.observers
import watchdog.utils.dirsnapshot
from AM import AM
import config as cfg
class Handler(watchdog.events.PatternMatchingEventHandler):
def __init__(self):
# Create multithreading pool just once:
self._executor = concurrent.futures.ThreadPoolExecutor(max_workers=cfg.workers)
# Set the patterns for PatternMatchingEventHandler
watchdog.events.PatternMatchingEventHandler.__init__(self, patterns=['TEST*'],
ignore_directories=True, case_sensitive=False)
def on_created(self, event):
logging.info("AM Watchdog received created event - %s.", event.src_path)
self._run_start_func(event.src_path)
def on_moved(self, event):
logging.info("AM Watchdog received modified event - %s.", event.src_path)
self._run_start_func(event.src_path)
def _run_start_func(self, newFname):
future = self._executor.submit(self._start_func, newFname)
future.result() # Wait for completion
print(f"{newFname} completed")
def _start_func(self, newFname):
try:
res = AM(newFname).start()
return res
except:
return 1
class Controller:
def __init__(self):
pass
def start(self):
event_handler = Handler()
observer = watchdog.observers.Observer()
observer.schedule(event_handler, path=cfg.paths["ipath"], recursive=True)
observer.start()
event = Event()
try:
# Block until keyboard interrupt:
event.wait()
except KeyboardInterrupt:
observer.stop()
observer.join()
Run Code Online (Sandbox Code Playgroud)