多线程Python项目中的内存泄漏(?)

jli*_*ski 5 python memory multithreading subprocess concurrent.futures

我有一个小项目(请记住我只是一个Python初学者)。该项目由几个较小的 .py 文件组成。首先有一个main.py看起来像这样的:

from Controller import Controller
import config as cfg

if __name__ == "__main__":
    for path in cfg.paths.values():
        if not os.path.exists(path):
            os.system(f"mkdir {path} -p")
    Con = Controller()
    Con.start()
Run Code Online (Sandbox Code Playgroud)

所以这个程序只是创建一些目录,创建控制器对象并运行它的方法。Controller.py看起来像这样:

import multiprocessing
import watchdog
import watchdog.events
import watchdog.observers
import watchdog.utils.dirsnapshot
import concurrent.futures
from AM import AM
import config as cfg

m = multiprocessing.Manager()
q = m.Queue()

class Handler(watchdog.events.PatternMatchingEventHandler):
    def __init__(self):
        # Set the patterns for PatternMatchingEventHandler
        watchdog.events.PatternMatchingEventHandler.__init__(self, patterns=['TEST*'],
                                                            ignore_directories=True, case_sensitive=False)

    def on_created(self, event):
        logging.info("AM Watchdog received created event - % s." % event.src_path)
        q.put(event.src_path)

    def on_moved(self, event):
        logging.info("AM Watchdog received modified event - % s." % event.src_path)
        q.put(event.src_path)

class Controller:
    def __init__(self):
        pass

    def _start_func(self, newFname):
        try:
            res = AM(f"{newFname}").start()
            return res
        except:
            return 1

    def start(self):
        event_handler = Handler()
        observer = watchdog.observers.Observer()
        observer.schedule(event_handler, path=cfg.paths["ipath"], recursive=True)
        observer.start()
        
        try:
            while True:
                time.sleep(1)
                with concurrent.futures.ThreadPoolExecutor(max_workers=cfg.workers) as executor:
                    futures = {}
                    while not q.empty():
                        newFname = q.get()
                        futures_to_work = executor.submit(self._start_func, newFname)
                        futures[futures_to_work] = newFname

                    for future in concurrent.futures.as_completed(futures):
                        name = futures.pop(future)
                        print(f"{name} completed")
                    
        except KeyboardInterrupt:
            observer.stop()
        observer.join()
Run Code Online (Sandbox Code Playgroud)

这个程序比上一个程序更复杂(并且可能有一些问题)。其目的是观察目录(cfg.paths["ipath"])并等待TEST*文件出现。当它完成时,它的名称将被添加到队列中。当队列不为空时,将concurrent.futures.ThreadPoolExecutor创建一个新的 future,并将名称传递给_start_func方法。此方法从 AM.py 创建一个新对象并运行它。其背后的思考过程是,我想要一个等待TEST*文件出现然后对其进行一些操作的程序,同时能够同时处理多个文件并按照它们出现的顺序对其进行处理。 AM.py看起来像这样:

import subprocess

class AM():
    def __init__(self, fname):
        pass
   
    def test_func(self, fname):
        c = f"some_faulty_unix_program {fname}".split(" ")
        p = subprocess.run(c, capture_output=True, text = True)

        out, err = p.stdout, p.stderr
        if out:
            print(out)
        if err:
            print(err)
            return 1
        return 0

    def start(self, fname):
        res = self.test_func(fname)
        return res
Run Code Online (Sandbox Code Playgroud)

该程序正在新进程中运行一些unix程序(在检测到的文件上Controller.py)。该程序经常会产生错误(由于TEST*文件并不总是有效)。我认为这个程序是什么并不重要,但以防万一这个程序来自solve-field并且astrometry.net文件TEST*是天空的图像。

整个项目作为服务运行,如下所示:

[Unit]
Description = test astrometry service
After = network.target

[Service]
Type = simple
ExecStart = /bin/bash -c "/home/project/main.py"
Restart = always
RestartSec = 2
TimeoutStartSec = infinity
User = root
Group = users
PrivateTmp = true
Environment = "PATH=/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/home/project"
[Install]
WantedBy = multi-user.target
Run Code Online (Sandbox Code Playgroud)

当我启用此服务并检查它时,systemctl status my_project.service它大约需要 76.0M 内存。我倾向于让这个工作整个晚上(我有一个每分钟拍摄一张夜空照片的系统,这个项目是为了计算这张夜空照片的天体测量)。第二天早上,当我测试systemctl status内存时,如果没有错误,则约为 200-300M;如果出现问题,则约为 3.5G(例如,我移动了此 UNIX 程序使用的配置文件,因此它在开始时会产生错误) )。为什么记忆力会这样增加?是我的代码有问题导致的,还是这个unix程序有问题?

Boo*_*boo 1

我不清楚内存泄漏发生在哪里。如果它在“some_faulty_unix_program”中运行AM.test_func,那么您需要找到或创建它的替代品。但我相信可以对代码进行一些简化/优化,以减少在其他地方发生内存泄漏的可能性。

首先,我认为您不需要一遍又一遍地重新创建多线程池。它还似乎watchdog使用多线程,因此您可以使用更高效的queue.Queue实例而不是托管队列。但最终我认为,通过对Controller.py代码进行一些重构,以便您的处理程序将任务提交到多线程池,您可以完全消除显式队列。下面的工作会起作用吗?

import concurrent.futures
from threading import Event

import watchdog
import watchdog.events
import watchdog.observers
import watchdog.utils.dirsnapshot
from AM import AM
import config as cfg

class Handler(watchdog.events.PatternMatchingEventHandler):
    def __init__(self):
        # Create multithreading pool just once:
        self._executor = concurrent.futures.ThreadPoolExecutor(max_workers=cfg.workers)
        # Set the patterns for PatternMatchingEventHandler
        watchdog.events.PatternMatchingEventHandler.__init__(self, patterns=['TEST*'],
                                                            ignore_directories=True, case_sensitive=False)
    def on_created(self, event):
        logging.info("AM Watchdog received created event - %s.", event.src_path)
        self._run_start_func(event.src_path)

    def on_moved(self, event):
        logging.info("AM Watchdog received modified event - %s.", event.src_path)
        self._run_start_func(event.src_path)

    def _run_start_func(self, newFname):
        future = self._executor.submit(self._start_func, newFname)
        future.result() # Wait for completion
        print(f"{newFname} completed")
        
    def _start_func(self, newFname):
        try:
            res = AM(newFname).start()
            return res
        except:
            return 1

class Controller:
    def __init__(self):
        pass

    def start(self):
        event_handler = Handler()
        observer = watchdog.observers.Observer()
        observer.schedule(event_handler, path=cfg.paths["ipath"], recursive=True)
        observer.start()
        event = Event()
        try:
            # Block until keyboard interrupt:
            event.wait()
        except KeyboardInterrupt:
            observer.stop()
            observer.join()
Run Code Online (Sandbox Code Playgroud)