芹菜一个经纪人多个队列和工人

cra*_*aro 1 python rabbitmq celery

我有一个调用的python文件tasks.py,其中我定义了4个单独的任务.我想配置芹菜以使用4个队列,因为每个队列将分配不同数量的工作人员.我正在阅读我应该使用route_task属性,但我尝试了几个选项而不是成功.

我正在关注这个doc celery route_tasks docs

我的目标是运行4个工作人员,每个任务一个,不要混合不同队列中不同工作人员的任务.这是可能的?这是一个很好的方法吗?

如果我做错了什么,我很乐意改变我的代码以使其工作

到目前为止,这是我的配置

tasks.py

app = Celery('tasks', broker='pyamqp://guest@localhost//')
app.conf.task_default_queue = 'default'
app.conf.task_queues = (
    Queue('queueA',    routing_key='tasks.task_1'),
    Queue('queueB',    routing_key='tasks.task_2'),
    Queue('queueC',    routing_key='tasks.task_3'),
    Queue('queueD',    routing_key='tasks.task_4')
)


@app.task
def task_1():
    print "Task of level 1"


@app.task
def task_2():
    print "Task of level 2"


@app.task
def task_3():
    print "Task of level 3"


@app.task
def task_4():
    print "Task of level 4"
Run Code Online (Sandbox Code Playgroud)

为每个队列运行芹菜一名工人

celery -A tasks worker --loglevel=debug -Q queueA --logfile=celery-A.log -n W1&
celery -A tasks worker --loglevel=debug -Q queueB --logfile=celery-B.log -n W2&
celery -A tasks worker --loglevel=debug -Q queueC --logfile=celery-C.log -n W3&
celery -A tasks worker --loglevel=debug -Q queueD --logfile=celery-D.log -n W4&
Run Code Online (Sandbox Code Playgroud)

Chi*_*and 5

没有必要进入复杂的路由以将任务提交到不同的队列.像往常一样定义任务.

from celery import celery

app = Celery('tasks', broker='pyamqp://guest@localhost//')

@app.task
def task_1():
    print "Task of level 1"


@app.task
def task_2():
    print "Task of level 2"
Run Code Online (Sandbox Code Playgroud)

现在在排队任务时,将任务放在适当的队列中.这是一个如何做到这一点的例子.

In [12]: from tasks import *

In [14]: result = task_1.apply_async(queue='queueA')

In [15]: result = task_2.apply_async(queue='queueB')
Run Code Online (Sandbox Code Playgroud)

这将使task_1命名队列queueAtask_2queueB.

现在你可以让你的工人去消费它们.

celery -A tasks worker --loglevel=debug -Q queueA --logfile=celery-A.log -n W1&
celery -A tasks worker --loglevel=debug -Q queueB --logfile=celery-B.log -n W2&
Run Code Online (Sandbox Code Playgroud)