使用 ThreadPoolExecutor 时 Django ORM 泄漏连接

Max*_*ysh 8 python django postgresql multithreading threadpool

ThreadPoolExecutor用来加速数据处理。问题是线程池创建了新的数据库连接,而 Django 没有关闭它们。我确实有 CONN_MAX_AGEsettings.py并且我已经尝试调用django.db.close_old_connections().

这是一个代码示例:

def compute(job):
    result = FooModel.objects.filter(...).aggregate(...)
    return BarModel.objects.create(result)

def process(dataset):
    thread_pool = ThreadPoolExecutor(max_workers=20)
    futures = []

    for job in dataset:
        futures += [thread_pool.submit(compute, job)]

    results = list(r.result() for r in wait(futures)[0])
    return results

for i in range(0, 100):
    process(['foo', 'bar', 'qux'])
Run Code Online (Sandbox Code Playgroud)

Django ORM 是否能够终止空闲 DB 连接,如果它们是在另一个线程中启动的?


UPD:有趣的是,Django 甚至不知道这些连接:

>>> from django.db import connections
>>> print(len(connections.all()))
>>> 2

Run Code Online (Sandbox Code Playgroud)
mypostgresdb=# select count(*) from pg_stat_activity;
 count 
-------
   182
(1 row)
Run Code Online (Sandbox Code Playgroud)

并且所有工作线程肯定已经关闭:

>>> #  workers threads were closed:
>>> import threading
>>> threading.enumerate()
[<_MainThread(MainThread, started 140660203321088)>]
Run Code Online (Sandbox Code Playgroud)

Dou*_*esh 11

我的猜测是ThreadPoolExecutor不是创建数据库连接的东西,但线程作业是保持连接的作业。我已经不得不处理这个问题了。

我最终构建了这个包装器,以确保在 ThreadPoolExecutor 中完成作业时手动关闭线程。这对于确保连接不会泄漏很有用,到目前为止,我在使用此代码时还没有看到任何泄漏。

from functools import wraps
from concurrent.futures import ThreadPoolExecutor
from django.db import connection

class DjangoConnectionThreadPoolExecutor(ThreadPoolExecutor):
    """
    When a function is passed into the ThreadPoolExecutor via either submit() or map(), 
    this will wrap the function, and make sure that close_django_db_connection() is called 
    inside the thread when it's finished so Django doesn't leak DB connections.

    Since map() calls submit(), only submit() needs to be overwritten.
    """
    def close_django_db_connection(self):
        connection.close()

    def generate_thread_closing_wrapper(self, fn):
        @wraps(fn)
        def new_func(*args, **kwargs):
            try:
                return fn(*args, **kwargs)
            finally:
                self.close_django_db_connection()
        return new_func

    def submit(*args, **kwargs):
        """
        I took the args filtering/unpacking logic from 
   
        https://github.com/python/cpython/blob/3.7/Lib/concurrent/futures/thread.py 
        
        so I can properly get the function object the same way it was done there.
        """
        if len(args) >= 2:
            self, fn, *args = args
            fn = self.generate_thread_closing_wrapper(fn=fn)
        elif not args:
            raise TypeError("descriptor 'submit' of 'ThreadPoolExecutor' object "
                        "needs an argument")
        elif 'fn' in kwargs:
            fn = self.generate_thread_closing_wrapper(fn=kwargs.pop('fn'))
            self, *args = args
    
        return super(self.__class__, self).submit(fn, *args, **kwargs)
Run Code Online (Sandbox Code Playgroud)

然后你可以使用这个:

    with DjangoConnectionThreadPoolExecutor(max_workers=15) as executor:
        results = list(executor.map(func, args_list))
Run Code Online (Sandbox Code Playgroud)

...并确信连接会关闭。