Flask,蓝图使用芹菜任务并获得循环导入

use*_*628 8 import celery flask

我有一个Blueprints和Celery的应用程序代码在这里:

config.py

import os
from celery.schedules import crontab
basedir = os.path.abspath(os.path.dirname(__file__))

class Config:
    SECRET_KEY = os.environ.get('SECRET_KEY') or ''
    SQLALCHEMY_COMMIT_ON_TEARDOWN = True
    RECORDS_PER_PAGE = 40
    SQLALCHEMY_DATABASE_URI = ''
    CELERY_BROKER_URL = ''
    CELERY_RESULT_BACKEND = ''
    CELERY_RESULT_DBURI = ''
    CELERY_TIMEZONE = 'Europe/Kiev'
    CELERY_ENABLE_UTC = False
    CELERYBEAT_SCHEDULE = {}

    @staticmethod
    def init_app(app):
        pass


class DevelopmentConfig(Config):
    DEBUG = True
    WTF_CSRF_ENABLED = True
    APP_HOME = ''
    SQLALCHEMY_DATABASE_URI = 'mysql+mysqldb://...'
    CELERY_BROKER_URL = 'sqla+mysql://...'
    CELERY_RESULT_BACKEND = "database"
    CELERY_RESULT_DBURI = 'mysql://...'
    CELERY_TIMEZONE = 'Europe/Kiev'
    CELERY_ENABLE_UTC = False
    CELERYBEAT_SCHEDULE = {
        'send-email-every-morning': {
            'task': 'app.workers.tasks.send_email_task',
            'schedule': crontab(hour=6, minute=15),
        },
    }


class TestConfig(Config):
    DEBUG = True
    WTF_CSRF_ENABLED = False
    TESTING = True
    SQLALCHEMY_DATABASE_URI = 'mysql+mysqldb://...'


class ProdConfig(Config):
    DEBUG = False
    WTF_CSRF_ENABLED = True
    SQLALCHEMY_DATABASE_URI = 'mysql+mysqldb://...'
    CELERY_BROKER_URL = 'sqla+mysql://...celery'
    CELERY_RESULT_BACKEND = "database"
    CELERY_RESULT_DBURI = 'mysql://.../celery'
    CELERY_TIMEZONE = 'Europe/Kiev'
    CELERY_ENABLE_UTC = False
    CELERYBEAT_SCHEDULE = {
        'send-email-every-morning': {
            'task': 'app.workers.tasks.send_email_task',
            'schedule': crontab(hour=6, minute=15),
        },
    }

config = {
    'development': DevelopmentConfig,
    'default': ProdConfig,
    'production': ProdConfig,
    'testing': TestConfig,
}


class AppConf:
    """
    Class to store current config even out of context
    """
    def __init__(self):
        self.app = None
        self.config = {}

    def init_app(self, app):
        if hasattr(app, 'config'):
            self.app = app
            self.config = app.config.copy()
        else:
            raise TypeError
Run Code Online (Sandbox Code Playgroud)

init .py:import os

from flask import Flask
from celery import Celery
from config import config, AppConf

def create_app(config_name):
    app = Flask(__name__)
    app.config.from_object(config[config_name])
    config[config_name].init_app(app)
    app_conf.init_app(app)

    # Connect to Staging view
    from staging.views import staging as staging_blueprint
    app.register_blueprint(staging_blueprint)

    return app


def make_celery(app=None):
    app = app or create_app(os.getenv('FLASK_CONFIG') or 'default')
    celery = Celery(__name__, broker=app.config.CELERY_BROKER_URL)
    celery.conf.update(app.conf)
    TaskBase = celery.Task

    class ContextTask(TaskBase):
        abstract = True

        def __call__(self, *args, **kwargs):
            with app.app_context():
                return TaskBase.__call__(self, *args, **kwargs)

    celery.Task = ContextTask
    return celery
Run Code Online (Sandbox Code Playgroud)

tasks.py:从app import make_celery,app_conf

cel = make_celery(app_conf.app)

@cel.task
def send_realm_to_fabricdb(realm, form):
    some actions...
Run Code Online (Sandbox Code Playgroud)

以下是问题:Blueprint"staging"使用任务send_realm_to_fabricdb,因此它会:from tasks import send_realm_to_fabricdb 当我刚运行应用程序时,一切正常但是,当我尝试运行celery时celery -A app.tasks worker -l info --beat,它会进入cel = make_celery(app_conf.app)tasks.py,得到app = None并尝试再次创建应用程序:注册蓝图...所以我在这里进行循环导入.你能告诉我如何打破这个循环吗?提前致谢.

Mig*_*uel 5

我没有代码,尝试了这一点,但我认为事情会更好地工作,如果你移动芹菜实例的创建出来的tasks.py,进入create_app功能,使其发生在同一时间app被创建的实例。

你给的芹菜工人的参数-A选项并不需要有任务,芹菜只需要芹菜的对象,因此,例如,你可以创建一个单独的启动脚本,说celery_worker.py的呼叫create_app建立appcel,然后把它交给工人-A celery_worker.cel,完全不涉及蓝图。

希望这可以帮助。

  • @ShulhiSapli是的,这是两个不同的进程,每个进程都有自己的应用程序实例。但是两者应以相同的方式创建,因此它们实际上是等效的(例如,它们具有相同的配置)。Celery worker中应用程序实例的唯一目的是为需要该代码的环境提供上下文。您不能以这种方式将“会话”,“ g”,“请求”变量从一个进程传递给另一个进程。 (3认同)