我想做两件事
app.steps['worker'].add(LoadConfig)
完美无缺
但我无法进行 SetQueue 启动步骤
只是我的 SetQueue 现在看起来像:
class SetQueue(bootsteps.StartStopStep):
requires = (Consumer, )
def start(self, parent, **kwargs):
parent.add_task_queue('q_name', exchange='q_name', routing_key='q_name')
app.steps['consumer'].add(SetQueue)
Run Code Online (Sandbox Code Playgroud)
它不起作用。
我认为我的问题是我不明白 - 在什么时候(requires=(???, )) 可以添加队列。
在 Celery 中有不同的“蓝图”,其中每个蓝图由多个引导步骤组成。
要启动的第一个蓝图是 Worker,例如启动执行池,要启动的最后一个蓝图是Consumer
,它连接到代理并启动任务消费者等。
因此,您requires
不是指向引导步骤,而是指向蓝图。
我看到您的引导步骤添加了另一个队列以供使用,因此它依赖Tasks
引导步骤会更有意义,如果您想调用
add_task_queue
.
class SetQueue(bootsteps.StartStopStep):
requires = ('celery.worker.consumer:Tasks', )
Run Code Online (Sandbox Code Playgroud)
您可以在此处查看启动组件概述:http : //docs.celeryproject.org/en/latest/userguide/extending.html#blueprints
此外,注册自定义引导步骤后,您可以使用以下方法生成此图的新版本:
$ celery -A proj graph bootsteps | dot -Tpng -o steps.png
Run Code Online (Sandbox Code Playgroud)