如何使用芹菜任务访问orm?

use*_*981 4 celery flask celery-task flask-sqlalchemy celerybeat

我正在尝试使用sqlalchemy + celery beats为我的数据库中的特定类型的对象翻转一个布尔标志.但是如何从tasks.py文件中访问我的orm?

from models import Book
from celery.decorators import periodic_task
from application import create_celery_app

celery = create_celery_app()
# Create celery: http://flask.pocoo.org/docs/0.10/patterns/celery/

# This task works fine
@celery.task
def celery_send_email(to,subject,template):
    with current_app.app_context():
        msg = Message(
            subject,
            recipients=[to],
            html=template,
            sender=current_app.config['MAIL_DEFAULT_SENDER']
        )
        return mail.send(msg)

#This fails
@periodic_task(name='release_flag',run_every=timedelta(seconds=10))
def release_flag():
    with current_app.app_context(): <<< #Fails on this line
        books = Book.query.all() <<<< #Fails here too
        for book in books:
          book.read = True
          book.save()
Run Code Online (Sandbox Code Playgroud)

我正在使用芹菜beat命令运行:

celery -A task worker -l INFO --beat

但是我收到以下错误:

raise RuntimeError('working outside of application context')
RuntimeError: working outside of application context
Run Code Online (Sandbox Code Playgroud)

哪个指向with current_app.app_context()行

如果我删除current_app.app_context()行,我将收到以下错误:

RuntimeError: application not registered on db instance and no application bound to current context
Run Code Online (Sandbox Code Playgroud)

是否有一种特殊的方式来获取烧瓶 - sqlalchemy orm用于芹菜任务?或者我会尝试做更好的方法吗?

到目前为止,唯一可行的解​​决方法是db.init_app(app)在我的应用程序工厂模式之后添加以下行:

db.app = app

我正在关注这个回购来创建我的芹菜应用程序https://github.com/mattupstate/overholt/blob/master/overholt/factory.py

dav*_*ism 7

您收到该错误是因为current_app需要应用程序上下文才能生效,但您尝试使用它来设置应用程序上下文.您需要使用实际的应用程序来设置上下文,然后您可以使用current_app.

with app.app_context():
    # do stuff that requires the app context
Run Code Online (Sandbox Code Playgroud)

或者您可以使用Flask文档中描述的模式进行子类化,celery.Task以便默认情况下了解应用程序上下文.

from celery import Celery

def make_celery(app):
     celery = Celery(app.import_name, broker=app.config['CELERY_BROKER_URL'])
     celery.conf.update(app.config)
     TaskBase = celery.Task

     class ContextTask(TaskBase):
         abstract = True

         def __call__(self, *args, **kwargs):
             with app.app_context():
                 return TaskBase.__call__(self, *args, **kwargs)

     celery.Task = ContextTask
     return celery

 celery = make_celery(app)
Run Code Online (Sandbox Code Playgroud)