我在 celery Worker 内部使用异步 ODM 时遇到多个问题首先,我无法使用 celery Worker 信号初始化我的数据库模型,我正在使用 beanie 进行数据库连接。
第一次实施
from asyncer import syncify
from asgiref.sync import async_to_sync
client = AsyncIOMotorClient(
DATABASE_URL, uuidRepresentation="standard" )
db = client[DB_NAME]
async def db_session():
await init_beanie(
database=db,
document_models=[Project, User],
)
@worker_ready.connect
def startup_celery_ecosystem(**kwargs):
logger.info('Startup celery worker process')
async_to_sync(db_session)()
logger.info('FINISHED : Startup celery worker process')
async def get_users():
users = User.find()
users_list = await users.to_list()
return users_list
@celery_app.task
def pool_db():
async_to_sync(get_users)()
#syncify(get_users)() same error User class is not initialized yet (init_beanie should have …Run Code Online (Sandbox Code Playgroud)