在 Flask 蓝图中导入 Celery

Seb*_*nto 4 python celery flask

我有一个具有 MVC 结构的 Flask 应用程序:

my_app
??? server.py
??? requirements.txt
??? models
?   ??? __init__.py
    ??? model.py
??? controllers
    ??? __init__.py
    ??? client_controllers
        ???controller.py
    ??? another_controller.py
??? templates
Run Code Online (Sandbox Code Playgroud)

我使用蓝图在“控制器”中拆分服务器代码,所以我有这样的事情:

服务器.py:

from flask import Flask
from celery import Celery

from controllers.client_controllers.controller import controller

app = Flask(__name__)
app.secret_key = 'SECRET'

app.register_blueprint(controller)

# Celery Configuration
def make_celery(app):

    celery = Celery(app.import_name, backend=app.config['CELERY_RESULT_BACKEND'],
                    broker=app.config['CELERY_BROKER_URL'])
    celery.conf.update(app.config)
    TaskBase = celery.Task
    class ContextTask(TaskBase):
        abstract = True
        def __call__(self, *args, **kwargs):
            with app.app_context():
                return TaskBase.__call__(self, *args, **kwargs)
    celery.Task = ContextTask
    return celery

app.config.update(
    CELERY_BROKER_URL='redis://localhost:6379',
    CELERY_RESULT_BACKEND='redis://localhost:6379'
)
celery = make_celery(app)


if __name__ == "__main__":
    app.run(host='0.0.0.0', debug=True)
Run Code Online (Sandbox Code Playgroud)

控制器.py:

from flask import Blueprint, render_template, json, request, redirect, url_for, abort, session



controller = Blueprint('controller', __name__,
                                     template_folder='templates/')

@celery.task()
def add_together(a, b):
    return a + b

@controller.route('/add', methods=['GET'])
def add():
    result = add_together.delay(23, 42)
    result.wait()
    return 'Processing'
Run Code Online (Sandbox Code Playgroud)

您可能会注意到,celery 没有导入到控制器中,因为我不知道如何将 celery 实例从server.py导入到我的controller.py 中而不会出错,我一直在尝试:

from ...server import celery
from ..server import celery
...etc
Run Code Online (Sandbox Code Playgroud)

但仍然因错误而失败。

小智 7

RuntimeError: Working outside of application context.发生烧瓶错误是因为您不在 Flask application_context() 中

您应该使用celery shared_task,这是您的 MVC 结构所需要的。

celery_flask/
??? celery_tasks
?   ??? app_tasks.py
?   ??? __init__.py
??? celery_worker.py
??? controllers
?   ??? __init__.py
?   ??? some_controller.py
??? __init__.py
??? server.py

Run Code Online (Sandbox Code Playgroud)

脚本app_tasks.py

#=====================
#   app_tasks.py
#=====================
from __future__ import absolute_import, unicode_literals
from celery import shared_task

@shared_task(name='celery_tasks.add_together')
def add_together(x, y):
    return x + y
Run Code Online (Sandbox Code Playgroud)

@shared_task装饰返回一个代理,始终指向活动的芹菜实例:

>>> from celery import Celery, shared_task
>>> @shared_task
... def add_together(x, y):
...     return x + y
...
>>> app1 = Celery(broker='amqp://')
>>> add_together.app is app1
True
>>> app2 = Celery(broker='redis://')
>>> add_together.app is app2
True
Run Code Online (Sandbox Code Playgroud)

定义任务后,您可以使用对 Celery 应用程序的引用来调用它们。这个 celery 应用程序可能是烧瓶application_context() 的一部分。例子:

脚本服务器.py

from __future__ import absolute_import
from flask import Flask
from celery import Celery

from controllers.some_controller import controller

flask_app = Flask(__name__)
flask_app.secret_key = 'SECRET'

flask_app.register_blueprint(controller)

# Celery Configuration
def make_celery( app ):
    celery = Celery('flask-celery-app', backend=app.config['CELERY_RESULT_BACKEND'],
                    broker=app.config['CELERY_BROKER_URL'],
                    include=['celery_tasks.app_tasks'])
    TaskBase = celery.Task
    class ContextTask(TaskBase):
        abstract = True
        def __call__(self, *args, **kwargs):
            with app.app_context():
                return TaskBase.__call__(self, *args, **kwargs)
    celery.Task = ContextTask
    return celery

def list_celery_task( ):
    from celery.task.control import inspect
    i = inspect()
    i.registered_tasks()
    from itertools import chain
    t = set(chain.from_iterable( i.registered_tasks().values() ))
    print "registered_tasks={}".format( t )
#======================================
#                MAIN
#======================================
flask_app.config.update(
    CELERY_BROKER_URL='redis://localhost:6379',
    CELERY_RESULT_BACKEND='redis://localhost:6379'
)
celery = make_celery(flask_app)
flask_app.celery = celery
list_celery_task( )


if __name__ == "__main__":
    flask_app.run(host='0.0.0.0', debug=True)
Run Code Online (Sandbox Code Playgroud)

脚本some_controller.py

#============================
#   some_controller.py
#============================
from __future__ import absolute_import
from flask import Blueprint
from flask import current_app

controller = Blueprint('controller', __name__,
                                     template_folder='templates/')

@controller.route('/add', methods=['GET'])
def add():
    print "calling add"
    result = current_app.celery.send_task('celery_tasks.add_together',args=[12,6])
    r = result.get()
    print 'Processing is {}'.format( r )
    return 'Processing is {}'.format( r )
Run Code Online (Sandbox Code Playgroud)

最后,启动worker来消费任务:

celery -A celery_worker worker --loglevel=DEBUG
Run Code Online (Sandbox Code Playgroud)

脚本celery_worker.py

#============================
#   celery_worker.py
#============================
from __future__ import absolute_import
from celery import Celery

# Celery Configuration
def make_celery():
    celery = Celery('flask-celery-app', backend='redis://localhost:6379',
                    broker='redis://localhost:6379',
                    include=['celery_tasks.app_tasks'])
    return celery

celery = make_celery()
print "tasks={}".format( celery.tasks.keys() )
Run Code Online (Sandbox Code Playgroud)


D D*_*wal 1

一种选择是将 celery 实例分配给应用程序实例,然后通过 Flask 的current_app.

在你的 server.py 中,只需添加:

celery = make_celery(app)
app.celery = celery
Run Code Online (Sandbox Code Playgroud)

然后你可以在你的controller.py中访问它:

from flask import current_app

@current_app.celery.task()
def add_together(a, b):
    return a + b
Run Code Online (Sandbox Code Playgroud)

  • 现在我收到“运行时错误:在应用程序上下文之外工作”。尝试用以下方法解决: with current_app.app_context():,但结果相同 (3认同)