如何覆盖 celery 任务的后端

scy*_*ale 5 python celery celery-task

我们使用 Redis 作为结果后端。然而,对于一项任务,我们希望重写它以使用 RabbitMQ。

Task.backend的文档说:

用于此任务的结果存储后端。默认为 CELERY_RESULT_BACKEND 设置

所以我假设我们可以设置Task.backendCELERY_RESULT_BACKEND.

所以我尝试这个:

celeryconfig.py

CELERY_RESULT_BACKEND = "redis://redis-host:7777"
Run Code Online (Sandbox Code Playgroud)

tasks.py

@app.task(backend='amqp://guest@localhost/tasks-stg')
def my_task(params):
    ...
Run Code Online (Sandbox Code Playgroud)

然而,工人失败了:

[2015-05-07 13:33:49,264: ERROR/Worker-1] Process Worker-1
Traceback (most recent call last):
  File "/project/python2.7_x64/lib/python2.7/site-packages/billiard/process.py", line 292, in _bootstrap
    self.run()
  File "/project/python2.7_x64/lib/python2.7/site-packages/billiard/pool.py", line 286, in run
    self.after_fork()
  File "/project/python2.7_x64/lib/python2.7/site-packages/billiard/pool.py", line 389, in after_fork
    self.initializer(*self.initargs)
  File "/project/python2.7_x64/lib/python2.7/site-packages/celery/concurrency/prefork.py", line 81, in process_initializer
    app=app)
  File "/project/python2.7_x64/lib/python2.7/site-packages/celery/app/trace.py", line 178, in build_tracer
    store_result = backend.store_result
AttributeError: 'str' object has no attribute 'store_result'
Run Code Online (Sandbox Code Playgroud)

scy*_*ale 7

该文档不正确。Task.backend实际上是来自 的后端类的实例celery.backends。在这种情况下,要覆盖任务类,我必须这样做:

from celery.backends.amqp import AMQPBackend

@app.task(backend=AMQPBackend(app, url='amqp://guest@localhost/tasks-stg'))
def my_task(params):
    ...
Run Code Online (Sandbox Code Playgroud)

然而,工作人员继续使用默认类,并且似乎没有提供覆盖它的方法。