use*_*628 8 import celery flask
我有一个Blueprints和Celery的应用程序代码在这里:
config.py
import os
from celery.schedules import crontab
basedir = os.path.abspath(os.path.dirname(__file__))
class Config:
SECRET_KEY = os.environ.get('SECRET_KEY') or ''
SQLALCHEMY_COMMIT_ON_TEARDOWN = True
RECORDS_PER_PAGE = 40
SQLALCHEMY_DATABASE_URI = ''
CELERY_BROKER_URL = ''
CELERY_RESULT_BACKEND = ''
CELERY_RESULT_DBURI = ''
CELERY_TIMEZONE = 'Europe/Kiev'
CELERY_ENABLE_UTC = False
CELERYBEAT_SCHEDULE = {}
@staticmethod
def init_app(app):
pass
class DevelopmentConfig(Config):
DEBUG = True
WTF_CSRF_ENABLED = True
APP_HOME = ''
SQLALCHEMY_DATABASE_URI = 'mysql+mysqldb://...'
CELERY_BROKER_URL = 'sqla+mysql://...'
CELERY_RESULT_BACKEND = "database"
CELERY_RESULT_DBURI = 'mysql://...'
CELERY_TIMEZONE = 'Europe/Kiev'
CELERY_ENABLE_UTC = False
CELERYBEAT_SCHEDULE = {
'send-email-every-morning': {
'task': 'app.workers.tasks.send_email_task',
'schedule': crontab(hour=6, minute=15),
},
}
class TestConfig(Config):
DEBUG = True
WTF_CSRF_ENABLED = False
TESTING = True
SQLALCHEMY_DATABASE_URI = 'mysql+mysqldb://...'
class ProdConfig(Config):
DEBUG = False
WTF_CSRF_ENABLED = True
SQLALCHEMY_DATABASE_URI = 'mysql+mysqldb://...'
CELERY_BROKER_URL = 'sqla+mysql://...celery'
CELERY_RESULT_BACKEND = "database"
CELERY_RESULT_DBURI = 'mysql://.../celery'
CELERY_TIMEZONE = 'Europe/Kiev'
CELERY_ENABLE_UTC = False
CELERYBEAT_SCHEDULE = {
'send-email-every-morning': {
'task': 'app.workers.tasks.send_email_task',
'schedule': crontab(hour=6, minute=15),
},
}
config = {
'development': DevelopmentConfig,
'default': ProdConfig,
'production': ProdConfig,
'testing': TestConfig,
}
class AppConf:
"""
Class to store current config even out of context
"""
def __init__(self):
self.app = None
self.config = {}
def init_app(self, app):
if hasattr(app, 'config'):
self.app = app
self.config = app.config.copy()
else:
raise TypeError
Run Code Online (Sandbox Code Playgroud)
init .py:import os
from flask import Flask
from celery import Celery
from config import config, AppConf
def create_app(config_name):
app = Flask(__name__)
app.config.from_object(config[config_name])
config[config_name].init_app(app)
app_conf.init_app(app)
# Connect to Staging view
from staging.views import staging as staging_blueprint
app.register_blueprint(staging_blueprint)
return app
def make_celery(app=None):
app = app or create_app(os.getenv('FLASK_CONFIG') or 'default')
celery = Celery(__name__, broker=app.config.CELERY_BROKER_URL)
celery.conf.update(app.conf)
TaskBase = celery.Task
class ContextTask(TaskBase):
abstract = True
def __call__(self, *args, **kwargs):
with app.app_context():
return TaskBase.__call__(self, *args, **kwargs)
celery.Task = ContextTask
return celery
Run Code Online (Sandbox Code Playgroud)
tasks.py:从app import make_celery,app_conf
cel = make_celery(app_conf.app)
@cel.task
def send_realm_to_fabricdb(realm, form):
some actions...
Run Code Online (Sandbox Code Playgroud)
以下是问题:Blueprint"staging"使用任务send_realm_to_fabricdb,因此它会:from tasks import send_realm_to_fabricdb
当我刚运行应用程序时,一切正常但是,当我尝试运行celery时celery -A app.tasks worker -l info --beat,它会进入cel = make_celery(app_conf.app)tasks.py,得到app = None并尝试再次创建应用程序:注册蓝图...所以我在这里进行循环导入.你能告诉我如何打破这个循环吗?提前致谢.
我没有代码,尝试了这一点,但我认为事情会更好地工作,如果你移动芹菜实例的创建出来的tasks.py,进入create_app功能,使其发生在同一时间app被创建的实例。
你给的芹菜工人的参数-A选项并不需要有任务,芹菜只需要芹菜的对象,因此,例如,你可以创建一个单独的启动脚本,说celery_worker.py的呼叫create_app建立app和cel,然后把它交给工人-A celery_worker.cel,完全不涉及蓝图。
希望这可以帮助。
| 归档时间: |
|
| 查看次数: |
3132 次 |
| 最近记录: |