Max*_*ysh 8 python django postgresql multithreading threadpool
我ThreadPoolExecutor用来加速数据处理。问题是线程池创建了新的数据库连接,而 Django 没有关闭它们。我确实有 CONN_MAX_AGEsettings.py并且我已经尝试调用django.db.close_old_connections().
这是一个代码示例:
def compute(job):
result = FooModel.objects.filter(...).aggregate(...)
return BarModel.objects.create(result)
def process(dataset):
thread_pool = ThreadPoolExecutor(max_workers=20)
futures = []
for job in dataset:
futures += [thread_pool.submit(compute, job)]
results = list(r.result() for r in wait(futures)[0])
return results
for i in range(0, 100):
process(['foo', 'bar', 'qux'])
Run Code Online (Sandbox Code Playgroud)
Django ORM 是否能够终止空闲 DB 连接,如果它们是在另一个线程中启动的?
UPD:有趣的是,Django 甚至不知道这些连接:
>>> from django.db import connections
>>> print(len(connections.all()))
>>> 2
Run Code Online (Sandbox Code Playgroud)
mypostgresdb=# select count(*) from pg_stat_activity;
count
-------
182
(1 row)
Run Code Online (Sandbox Code Playgroud)
并且所有工作线程肯定已经关闭:
>>> # workers threads were closed:
>>> import threading
>>> threading.enumerate()
[<_MainThread(MainThread, started 140660203321088)>]
Run Code Online (Sandbox Code Playgroud)
Dou*_*esh 11
我的猜测是ThreadPoolExecutor不是创建数据库连接的东西,但线程作业是保持连接的作业。我已经不得不处理这个问题了。
我最终构建了这个包装器,以确保在 ThreadPoolExecutor 中完成作业时手动关闭线程。这对于确保连接不会泄漏很有用,到目前为止,我在使用此代码时还没有看到任何泄漏。
from functools import wraps
from concurrent.futures import ThreadPoolExecutor
from django.db import connection
class DjangoConnectionThreadPoolExecutor(ThreadPoolExecutor):
"""
When a function is passed into the ThreadPoolExecutor via either submit() or map(),
this will wrap the function, and make sure that close_django_db_connection() is called
inside the thread when it's finished so Django doesn't leak DB connections.
Since map() calls submit(), only submit() needs to be overwritten.
"""
def close_django_db_connection(self):
connection.close()
def generate_thread_closing_wrapper(self, fn):
@wraps(fn)
def new_func(*args, **kwargs):
try:
return fn(*args, **kwargs)
finally:
self.close_django_db_connection()
return new_func
def submit(*args, **kwargs):
"""
I took the args filtering/unpacking logic from
https://github.com/python/cpython/blob/3.7/Lib/concurrent/futures/thread.py
so I can properly get the function object the same way it was done there.
"""
if len(args) >= 2:
self, fn, *args = args
fn = self.generate_thread_closing_wrapper(fn=fn)
elif not args:
raise TypeError("descriptor 'submit' of 'ThreadPoolExecutor' object "
"needs an argument")
elif 'fn' in kwargs:
fn = self.generate_thread_closing_wrapper(fn=kwargs.pop('fn'))
self, *args = args
return super(self.__class__, self).submit(fn, *args, **kwargs)
Run Code Online (Sandbox Code Playgroud)
然后你可以使用这个:
with DjangoConnectionThreadPoolExecutor(max_workers=15) as executor:
results = list(executor.map(func, args_list))
Run Code Online (Sandbox Code Playgroud)
...并确信连接会关闭。
| 归档时间: |
|
| 查看次数: |
2049 次 |
| 最近记录: |