如何使用 Flask 应用工厂模式实现 Celery

Pet*_*nfo 3 python blueprint celery flask

我在使用 python Flask 应用程序工厂应用程序实现 celery 时遇到问题

我打算从应用程序初始化文件创建一个 Celery 应用程序的实例,如下所示:

from celery import Celery
celery = Celery('myapp', broker='redis://localhost:6379/0', backend='redis://localhost:6379/0')
Run Code Online (Sandbox Code Playgroud)

调用时我无法使用其他蓝图中的 Celery。

API*_*API 5

def init_celery(app):
    celery = Celery()
    celery.conf.broker_url = app.config['CELERY_BROKER_URL']
    celery.conf.result_backend = app.config['CELERY_RESULT_BACKEND']
    celery.conf.update(app.config)

    class ContextTask(celery.Task):
        """Make celery tasks work with Flask app context"""
        def __call__(self, *args, **kwargs):
            with app.app_context():
                return self.run(*args, **kwargs)

    celery.Task = ContextTask
    return celery
Run Code Online (Sandbox Code Playgroud)

在以下情况下初始化芹菜create_app

init_celery(app)
Run Code Online (Sandbox Code Playgroud)

了解 celery 如何在这个 Flask cookiecutter 中实现