Celery工作人员如何从多个队列消费决定从哪个队列消费?

Jon*_*ans 24 scheduled-tasks celery celeryd

我使用Celery来执行异步后台任务,Redis作为后端.我对Celery工作人员在以下情况下的行为感兴趣:

我正在使用一个守护进程作为守护进程celeryd.已通过该-Q选项为此工作人员分配了两个队列以供使用:

celeryd -E -Q queue1,queue2
Run Code Online (Sandbox Code Playgroud)

工作人员如何决定从哪里获取下一个要使用的任务?是否随机消耗任何一个任务queue1queue2?它会优先获取,queue1因为它是传递给参数列表中的第一个-Q吗?

Tro*_*roy 13

从我的测试来看,它处理多个队列循环风格.

如果我使用此测试代码:

from celery import task
import time


@task
def my_task(item_id):
    time.sleep(0.5)
    print('Processing item "%s"...' % item_id)


def add_items_to_queue(queue_name, items_count):
    for i in xrange(0, items_count):
        my_task.apply_async(('%s-%d' % (queue_name, i),), queue=queue_name)


add_items_to_queue('queue1', 10)
add_items_to_queue('queue2', 10)
add_items_to_queue('queue3', 5)
Run Code Online (Sandbox Code Playgroud)

并使用(使用django-celery)启动队列:

`manage.py celery worker -Q queue1,queue2,queue3`
Run Code Online (Sandbox Code Playgroud)

它输出:

Processing item "queue1-0"...
Processing item "queue3-0"...
Processing item "queue2-0"...
Processing item "queue1-1"...
Processing item "queue3-1"...
Processing item "queue2-1"...
Processing item "queue1-2"...
Processing item "queue3-2"...
Processing item "queue2-2"...
Processing item "queue1-3"...
Processing item "queue3-3"...
Processing item "queue2-3"...
Processing item "queue1-4"...
Processing item "queue3-4"...
Processing item "queue2-4"...
Processing item "queue1-5"...
Processing item "queue2-5"...
Processing item "queue1-6"...
Processing item "queue2-6"...
Processing item "queue1-7"...
Processing item "queue2-7"...
Processing item "queue1-8"...
Processing item "queue2-8"...
Processing item "queue1-9"...
Processing item "queue2-9"...
Run Code Online (Sandbox Code Playgroud)

因此,在进入下一个queue1项之前,它会从每个队列中提取一个项目,即使所有queue1任务都是在queue2 和3任务之前发布的.

注意:正如@WarLord所指出的,这种确切的行为只有在CELERYD_PREFETCH_MULTIPLIER设置为1 时才有效.如果它大于1,那么这意味着将批量从队列中提取项目.因此,如果您有PREFETCH_MULTIPLIER设置为4的4个进程,这意味着将从队列中拔出16个项目,因此您将无法获得上面的确切输出,但它仍将大致遵循循环.

  • 在 2019 年,这不是正确的答案,请参阅来自 @crazyshezy 的正确和更新的订单。现在的订单基于到达队列。 (2认同)

Cra*_*ezy 5

注意:这个答案已被弃用:Celery的最新版本与2013年的版本大不相同...

消耗多个队列的工作人员消耗任务,FIFO命令也在多个队列中维护.

例:

队列1:(t1,t2,t5,t7)
队列2:(t0,t3,t4,t6)

假设0-7表示已发布任务的顺序

消耗顺序为t0,t1,t2,t3,t4,t5,t6,t7

  • 有没有办法修改这种行为?也许优先考虑一个队列而不是另一个队列:t1,t2,t5,t7,t0,t3,t4,t6. (3认同)
  • 从我的测试中,它处理队列循环风格,因此,在您的示例中,消耗顺序将是t1,t0,t2,t3,t5,t4,t7,t6.但是,如果您将PREFETCH_MULTIPLIER设置为1以外的顺序,则顺序可能会有所不同,并且任务需要不同的时间才能完成. (3认同)
  • RabbitMQ 不行,但可以使用其他消息传输(例如 Redis),但这会涉及子类化。请注意,您描述的顺序实际上是饥饿,如果 Queue1 持续繁忙,则 Queue2 中的消息将永远不会被处理。 (2认同)