事件循环在芹菜工作人员中关闭

H-A*_*DJI 5 python mongodb celery odm python-asyncio

我在 celery Worker 内部使用异步 ODM 时遇到多个问题首先,我无法使用 celery Worker 信号初始化我的数据库模型,我正在使用 beanie 进行数据库连接。

第一次实施

from asyncer import syncify
from asgiref.sync import async_to_sync 
client = AsyncIOMotorClient(
    DATABASE_URL, uuidRepresentation="standard" )
    db = client[DB_NAME]

async def db_session():
        await init_beanie(
        database=db,
        document_models=[Project, User],
    )
@worker_ready.connect
def startup_celery_ecosystem(**kwargs):
            logger.info('Startup celery worker process')
            async_to_sync(db_session)()
            logger.info('FINISHED : Startup celery worker process')
async def get_users():
    users = User.find()
    users_list = await users.to_list()
    return users_list

@celery_app.task
def pool_db():
    async_to_sync(get_users)()
    #syncify(get_users)() same error User class is not initialized yet (init_beanie should have already initialized all the models )
Run Code Online (Sandbox Code Playgroud)

通过此实现,我无法使用 User 和 Project 类访问我的数据库,并且会引发错误,就好像 User 和 Project 尚未实例化一样

解决方法是在模块级别调用 db_session() ,这解决了数据库模型实例化的问题,但是现在在查询数据库时,我从 celery 任务中收到以下错误

运行时错误:事件循环已关闭

第二次实施

from asyncer import syncify
from asgiref.sync import async_to_sync client = AsyncIOMotorClient(
DATABASE_URL, uuidRepresentation="standard" )
db = client[DB_NAME]

async def db_session():
        await init_beanie(
        database=db,
        document_models=[Project, User],
    )
# now  init_beanie at module level
async_to_sync(db_session)()

async def get_users():
    users = User.find()
    users_list = await users.to_list()
    return users_list

@celery_app.task
def pool_db():
    # this raises the following Runtime error RuntimeError('Event loop is closed')
    async_to_sync(get_users)()
    #syncify(get_users)() same error 
Run Code Online (Sandbox Code Playgroud)

我不太熟悉 asyncio 的实现方式以及 asyncer 和 asgiref 如何允许在同步线程内运行异步代码,这让我感到困惑,任何帮助都会得到帮助

H-A*_*DJI 0

经过多次调查,使用flower来监视worker并记录worker Id(进程id),结果发现Celeryworker本身不处理任何任务,它会产生其他子进程(这是我的情况,因为我使用的是默认的执行程序池,它是prefork),而信号(worker_ready.connect)仅在主管进程Celery工作进程上运行,而不是在子进程上运行,并且由于进程在内存方面是隔离的,这意味着您无法访问数据库连接或任何初始化的资源子进程。现在我将 celery 与 gevent 一起使用,它只生成 1 个进程,因为最初我的项目不需要 CPU 繁重的任务,这意味着我不需要 prefork 池提供的所有 cpu 功率