H-A*_*DJI 5 python mongodb celery odm python-asyncio
我在 celery Worker 内部使用异步 ODM 时遇到多个问题首先,我无法使用 celery Worker 信号初始化我的数据库模型,我正在使用 beanie 进行数据库连接。
第一次实施
from asyncer import syncify
from asgiref.sync import async_to_sync
client = AsyncIOMotorClient(
DATABASE_URL, uuidRepresentation="standard" )
db = client[DB_NAME]
async def db_session():
await init_beanie(
database=db,
document_models=[Project, User],
)
@worker_ready.connect
def startup_celery_ecosystem(**kwargs):
logger.info('Startup celery worker process')
async_to_sync(db_session)()
logger.info('FINISHED : Startup celery worker process')
async def get_users():
users = User.find()
users_list = await users.to_list()
return users_list
@celery_app.task
def pool_db():
async_to_sync(get_users)()
#syncify(get_users)() same error User class is not initialized yet (init_beanie should have already initialized all the models )
Run Code Online (Sandbox Code Playgroud)
通过此实现,我无法使用 User 和 Project 类访问我的数据库,并且会引发错误,就好像 User 和 Project 尚未实例化一样
解决方法是在模块级别调用 db_session() ,这解决了数据库模型实例化的问题,但是现在在查询数据库时,我从 celery 任务中收到以下错误
运行时错误:事件循环已关闭
第二次实施
from asyncer import syncify
from asgiref.sync import async_to_sync client = AsyncIOMotorClient(
DATABASE_URL, uuidRepresentation="standard" )
db = client[DB_NAME]
async def db_session():
await init_beanie(
database=db,
document_models=[Project, User],
)
# now init_beanie at module level
async_to_sync(db_session)()
async def get_users():
users = User.find()
users_list = await users.to_list()
return users_list
@celery_app.task
def pool_db():
# this raises the following Runtime error RuntimeError('Event loop is closed')
async_to_sync(get_users)()
#syncify(get_users)() same error
Run Code Online (Sandbox Code Playgroud)
我不太熟悉 asyncio 的实现方式以及 asyncer 和 asgiref 如何允许在同步线程内运行异步代码,这让我感到困惑,任何帮助都会得到帮助
经过多次调查,使用flower来监视worker并记录worker Id(进程id),结果发现Celeryworker本身不处理任何任务,它会产生其他子进程(这是我的情况,因为我使用的是默认的执行程序池,它是prefork),而信号(worker_ready.connect)仅在主管进程Celery工作进程上运行,而不是在子进程上运行,并且由于进程在内存方面是隔离的,这意味着您无法访问数据库连接或任何初始化的资源子进程。现在我将 celery 与 gevent 一起使用,它只生成 1 个进程,因为最初我的项目不需要 CPU 繁重的任务,这意味着我不需要 prefork 池提供的所有 cpu 功率