SQLAlchemy在多线程应用程序中正确处理会话

and*_*ean 21 python session multithreading sqlalchemy

我无法理解如何有效地正确打开和关闭数据库会话,正如我所理解的sqlalchemy文档,如果我使用scoped_session构造我的Session对象,然后使用返回的Session对象来创建会话,它的线程安全,所以基本上每个线程会得到它自己的会话,并且不会有问题.现在下面的例子工作,我把它放在一个无限循环中,看它是否正确关闭会话,如果我正确监视它(在mysql中通过执行"SHOW PROCESSLIST;"),连接只是继续增长,它不会关闭它们,即使我使用了session.close(),甚至在每次运行结束时删除了scoped_session对象.我究竟做错了什么?我在更大的应用程序中的目标是使用所需的最少数量的数据库连接,因为我当前的工作实现在需要它的每个方法中创建一个新会话,并在返回之前关闭它,这似乎效率低下.

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, scoped_session
from threading import Thread
from Queue import Queue, Empty as QueueEmpty
from models import MyModel


DATABASE_CONNECTION_INFO = 'mysql://username:password@localhost:3306/dbname'


class MTWorker(object):

    def __init__(self, worker_count=5):
        self.task_queue = Queue()
        self.worker_count = worker_count
        self.db_engine = create_engine(DATABASE_CONNECTION_INFO, echo=False)
        self.DBSession = scoped_session(
            sessionmaker(
                autoflush=True,
                autocommit=False,
                bind=self.db_engine
            )
        )

    def _worker(self):
        db_session = self.DBSession()
        while True:
            try:
                task_id = self.task_queue.get(False)
                try:
                    item = db_session.query(MyModel).filter(MyModel.id == task_id).one()
                    # do something with item
                except Exception as exc:
                    # if an error occurrs we skip it
                    continue

                finally:
                    db_session.commit()
                    self.task_queue.task_done()
            except QueueEmpty:
                db_session.close()
                return

    def start(self):
        try:
            db_session = self.DBSession()
            all_items = db_session.query(MyModel).all()
            for item in all_items:
                self.task_queue.put(item.id)

            for _i in range(self.worker_count):
                t = Thread(target=self._worker)
                t.start()

            self.task_queue.join()
        finally:
            db_session.close()
            self.DBSession.remove()


if __name__ == '__main__':
    while True:
        mt_worker = MTWorker(worker_count=50)
        mt_worker.start()
Run Code Online (Sandbox Code Playgroud)

Sin*_*ion 39

您应该只被调用create_enginescoped_session一次每个进程(每个数据库).每个都将获得自己的连接池或会话(分别),因此您需要确保只创建一个池.只需将其设为全局模块级别即可.如果你需要更多地管理你的会话,你可能不应该使用scoped_session

要做的另一个改变是DBSession直接使用,就好像它是一个会话.如果需要,在scoped_session上调用会话方法将透明地创建线程本地会话,并将方法调用转发给会话.

另一件需要注意的是 pool_size 连接池,默认为5.对于许多应用程序来说没问题,但是如果要创建大量线程,则可能需要调整该参数

DATABASE_CONNECTION_INFO = 'mysql://username:password@localhost:3306/dbname'
db_engine = create_engine(DATABASE_CONNECTION_INFO, echo=False)
DBSession = scoped_session(
    sessionmaker(
        autoflush=True,
        autocommit=False,
        bind=db_engine
    )
)


class MTWorker(object):

    def __init__(self, worker_count=5):
        self.task_queue = Queue()
        self.worker_count = worker_count
# snip
Run Code Online (Sandbox Code Playgroud)