Ale*_*lex 4 python mysql multithreading sqlalchemy
我发现 SQLAlchemy 不会释放数据库连接(在我的例子中),因此这种情况堆积到可能导致服务器崩溃的程度。连接由不同的线程构成。
这是简化的代码
"""
Test to see DB connection allocation size while making call from multiple threads
"""
from time import sleep
from threading import Thread, current_thread
import uuid
from sqlalchemy import func, or_, desc
from sqlalchemy import event
from sqlalchemy import ForeignKey, Column, Integer, String, DateTime, UniqueConstraint
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm import scoped_session
from sqlalchemy.orm import relationship
from sqlalchemy.orm import scoped_session, Session
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.types import Integer, DateTime, String, Boolean, Text, Float
from sqlalchemy.engine import Engine
from sqlalchemy.pool import NullPool
# MySQL
SQLALCHEMY_DATABASE = 'mysql'
SQLALCHEMY_DATABASE_URI = 'mysql+pymysql://amalgam:amalgam@localhost/amalgam?charset=utf8mb4' # https://stackoverflow.com/questions/47419943/pymysql-warning-1366-incorrect-string-value-xf0-x9f-x98-x8d-t
SQLALCHEMY_ECHO = False
SQLALCHEMY_ENGINE_OPTIONS = {'pool_size': 40, 'max_overflow': 0}
SQLALCHEMY_ISOLATION_LEVEL = "AUTOCOMMIT"
# DB Engine
# engine = create_engine(SQLALCHEMY_DATABASE_URI, echo=SQLALCHEMY_ECHO, pool_recycle=3600,
# isolation_level= SQLALCHEMY_ISOLATION_LEVEL,
# **SQLALCHEMY_ENGINE_OPTIONS
# ) # Connect to server
engine = create_engine(SQLALCHEMY_DATABASE_URI,
echo=SQLALCHEMY_ECHO,
# poolclass=NullPool,
pool_recycle=3600,
isolation_level= SQLALCHEMY_ISOLATION_LEVEL,
**SQLALCHEMY_ENGINE_OPTIONS
) # Connect to server
session_factory = sessionmaker(bind=engine)
Base = declarative_base()
# ORM Entity
class User(Base):
LEVEL_NORMAL = 'normal'
LEVEL_ADMIN = 'admin'
__tablename__ = "users"
id = Column(Integer, primary_key=True)
name = Column(String(100), nullable=True)
email = Column(String(100), nullable=True, unique=True)
password = Column(String(100), nullable=True)
level = Column(String(100), default=LEVEL_NORMAL)
# Workers
NO = 10
workers = []
_scoped_session_factory = scoped_session(session_factory)
def job(job_id):
session = _scoped_session_factory()
print("Job is {}".format(job_id))
user = User(name='User {} {}'.format(job_id, uuid.uuid4()), email='who cares {} {}'.format(job_id, uuid.uuid4()))
session.add(user)
session.commit()
session.close()
print("Job {} done".format(job_id))
sleep(10)
# Create worker threads
for i in range(NO):
workers.append(Thread(target=job, kwargs={'job_id':i}))
# Start them
for worker in workers:
worker.start()
# Join them
for worker in workers:
worker.join()
# Allow some time to see MySQL's "show processlist;" command
sleep(10)
Run Code Online (Sandbox Code Playgroud)
程序到达的那一刻
sleep(10)
Run Code Online (Sandbox Code Playgroud)
我运行
show processlist;
Run Code Online (Sandbox Code Playgroud)
它给出以下结果 - 意味着与数据库的所有连接仍然有效。
我怎样才能强制关闭这些连接?
注意:我可以利用
poolclass=NullPool
Run Code Online (Sandbox Code Playgroud)
但我觉得这个解决方案限制太多 - 我仍然希望能够访问数据库池,但能够在需要时以某种方式关闭连接
以下来自构造函数的签名QueuePool
\n\npool_size \xe2\x80\x93 要维护的池的大小,默认为 5。这是池中将持久保留的最大连接数。请注意,池开始时没有连接;一旦请求了这个\n数量的连接,该\n连接数量将保持不变。pool_size可以设置为0,表示没有大小限制;要\n禁用池化,请使用
\nNullPool代替。max_overflow \xe2\x80\x93 池的最大溢出大小。当签出的连接数达到 pool_size 中设置的大小时,\n将返回额外的连接,最多可达此限制。当这些\n其他连接返回到池中时,它们将被断开\n并被丢弃。由此可见,池允许的同时连接总数为 pool_size + max_overflow,而池允许的连接总数为 pool_size。max_overflow可以设置为-1表示没有溢出限制;并发连接总数没有限制。\n 默认为 10。
\n
SQLALCHEMY_ENGINE_OPTIONS = {\'pool_size\': 40, \'max_overflow\': 0}\nRun Code Online (Sandbox Code Playgroud)\n鉴于上述情况,此配置要求SQLAlchemy 保持最多 40 个连接打开。
\n如果您不喜欢这样,但希望保持某些连接可用,您可以尝试如下配置:
\nSQLALCHEMY_ENGINE_OPTIONS = {\'pool_size\': 10, \'max_overflow\': 30}\nRun Code Online (Sandbox Code Playgroud)\n这将在池中保留 10 个持久连接,如果同时请求,将爆发多达 40 个连接。任何超出配置的池大小的连接在检回池中后都会立即关闭。
\n| 归档时间: |
|
| 查看次数: |
4258 次 |
| 最近记录: |