如何在ProcessPool中处理SQLAlchemy连接?

rom*_*man 24 python sqlalchemy rabbitmq python-asyncio python-multiprocessing

我有一个反应器从RabbitMQ代理获取消息并触发工作方法在进程池中处理这些消息,如下所示:

反应堆

这是使用python实现的asyncio,loop.run_in_executor()concurrent.futures.ProcessPoolExecutor.

现在我想使用SQLAlchemy访问worker方法中的数据库.大多数情况下,处理将是非常简单和快速的CRUD操作.

反应器在开始时每秒处理10-50条消息,因此不能为每个请求打开新的数据库连接.相反,我想在每个进程中维护一个持久连接.

我的问题是:我怎么能这样做?我可以将它们存储在全局变量中吗?SQA连接池是否会为我处理这个问题?当反应堆停止时如何清理?

[更新]

  • 数据库是带有InnoDB的MySQL.

为什么选择带有进程池的模式?

当前实现使用不同的模式,其中每个使用者在其自己的线程中运行.不知何故,这不是很好.已经有大约200个消费者在他们自己的线程中运行,并且系统正在快速增长.为了更好地扩展,我们的想法是分离关注点并在I/O循环中使用消息并将处理委托给池.当然,整个系统的性能主要是I/O绑定.但是,处理大型结果集时CPU是一个问题.

另一个原因是"易用性".虽然消息的连接处理和消耗是异步实现的,但是worker中的代码可以是同步且简单的.

很快,很明显,通过工作者内部的持久网络连接访问远程系统是一个问题.这就是CommunicationChannels的用途:在worker中,我可以通过这些通道向消息总线发出请求.

我目前的一个想法是以类似的方式处理数据库访问:将语句通过队列传递到事件循环,然后将它们发送到数据库.但是,我不知道如何使用SQLAlchemy执行此操作.入口点在哪里?对象需要pickled在它们通过队列时传递.如何从SQA查询中获取此类对象?与数据库的通信必须异步工作,以免阻塞事件循环.我可以使用例如aiomysql作为SQA的数据库驱动程序吗?

olo*_*kki 6

如果在工作进程中假设您正在使用orm进行实例化,则可以轻松满足每个进程池进程一个数据库连接的要求session.

一个简单的解决方案是建立一个可以跨请求重用的全局会话:

# db.py
engine = create_engine("connection_uri", pool_size=1, max_overflow=0)
DBSession = scoped_session(sessionmaker(bind=engine)) 
Run Code Online (Sandbox Code Playgroud)

关于工人的任务:

# task.py
from db import engine, DBSession
def task():
    DBSession.begin() # each task will get its own transaction over the global connection
    ...
    DBSession.query(...)
    ...
    DBSession.close() # cleanup on task end
Run Code Online (Sandbox Code Playgroud)

参数pool_sizemax_overflow 自定义 create_engine使用的默认QueuePool.pool_size将确保您的进程仅在进程池中的每个进程保持1个连接存活.

如果您希望它重新连接,您可以使用DBSession.remove()它将从注册表中删除会话,并使其在下一次DBSession使用时重新连接.您还可以使用Poolrecycle参数在指定的时间后重新连接连接.

在开发/重新布线期间,您可以使用AssertionPool,如果从池中签出多个连接,则会引发异常,请参阅切换池实现以了解如何执行此操作.