结果后端中的 Celery + rabbitmq

tim*_*tim 3 python rabbitmq celery

任务.py:

from celery import Celery

app = Celery('tasks', broker='amqp://guest@localhost//')

@app.task
def add(x, y):
    return x + y
Run Code Online (Sandbox Code Playgroud)

调用.py:

from tasks import add

result = add.delay(1, 4)

result.ready()
Run Code Online (Sandbox Code Playgroud)

celeryconfig.py:(在同一目录下)

BROKER_URL = 'amqp://guest@localhost//'

CELERY_RESULT_BACKEND = 'amqp://guest@localhost//'

CELERY_TASK_SERIALIZER = 'json'

CELERY_RESULT_SERIALIZER = 'json'

CELERY_ACCEPT_CONTENT=['json']

CELERY_TIMEZONE = 'Europe/Oslo'

CELERY_ENABLE_UTC = True
Run Code Online (Sandbox Code Playgroud)

call.py我有一个错误:

AttributeError: 'DisabledBackend' 对象没有属性 '_get_task_meta_for'

我阅读了文档,我有结果后端,为什么它不起作用?如何解决?

谢谢!

Fla*_*ois 6

您不应Celery('tasks', broker='amqp://guest@localhost//')与 celeryconfig.py 文件一起使用。

您应该采用的方式(以及文档方式)是:

创建一个名为mypackage的包(假设 mypackage 不是子包),其中包含两个文件:

芹菜.py

from __future__ import absolute_import

from celery import Celery

app = Celery('mypackage',
             broker='amqp://guest@localhost//',
             backend='amqp://guest@localhost//',
             include=['mypackage.tasks'] #References your tasks. Donc forget to put the whole absolute path.
             )

app.conf.update(
        CELERY_TASK_SERIALIZER = 'json',
        CELERY_RESULT_SERIALIZER = 'json',
        CELERY_ACCEPT_CONTENT=['json'],
        CELERY_TIMEZONE = 'Europe/Oslo',
        CELERY_ENABLE_UTC = True
                )
Run Code Online (Sandbox Code Playgroud)

任务.py

from mypackage.celery import app
@app.task
def add(x, y):
    return x + y
Run Code Online (Sandbox Code Playgroud)

你的 call.py 文件没问题。

使用celery -A mypackage worker命令行启动 Celery Worker ,在mypackage的父目录中

然后你可以启动另一个 Python 解释器,使用 call.py 和瞧!


tim*_*tim -4

我在我的代码中发现了一个错误。在tasks.py中:

app = Celery('tasks', broker='amqp://guest@localhost//', backend='amqp')
Run Code Online (Sandbox Code Playgroud)

需要后台参数。