如何使用Celery和Django将任务路由到不同的队列

And*_*der 5 django celery python-3.x django-celery python-3.6

我使用以下堆栈:

  • Python 3.6
  • Celery v4.2.1(经纪人:RabbitMQ v3.6.0)
  • Django v2.0.4.

根据Celery的文档,在不同队列上运行计划任务应该像在CELERY_ROUTES上为任务定义相应队列一样简单,但是所有任务似乎都在Celery的默认队列上执行.

这是my_app/settings.py上的配置:

CELERY_BROKER_URL = "amqp://guest:guest@localhost:5672//"
CELERY_ROUTES = {
 'app1.tasks.*': {'queue': 'queue1'},
 'app2.tasks.*': {'queue': 'queue2'},
}
CELERY_BEAT_SCHEDULE = {
    'app1_test': {
        'task': 'app1.tasks.app1_test',
        'schedule': 15,
    },
    'app2_test': {
        'task': 'app2.tasks.app2_test',
        'schedule': 15,
    },

}
Run Code Online (Sandbox Code Playgroud)

任务只是用于测试路由的简单脚本:

文件app1/tasks.py:

from my_app.celery import app
import time


@app.task()
def app1_test():
    print('I am app1_test task!')
    time.sleep(10)
Run Code Online (Sandbox Code Playgroud)

文件app2/tasks.py:

from my_app.celery import app
import time


@app.task()
def app2_test():
    print('I am app2_test task!')
    time.sleep(10)
Run Code Online (Sandbox Code Playgroud)

当我使用所有必需的队列运行Celery时:

celery -A my_app worker -B -l info -Q celery,queue1,queue2
Run Code Online (Sandbox Code Playgroud)

RabbitMQ将显示只有默认队列" celery "正在运行任务:

sudo rabbitmqctl list_queues
# Tasks executed by each queue:
#  - celery 2
#  - queue1 0
#  - queue2 0
Run Code Online (Sandbox Code Playgroud)

有人知道如何解决这种意外行为吗?

问候,

And*_*der 8

我有它工作,这里有几点需要注意:

根据Celery的4.2.0文档,CELERY_ROUTES应该是定义队列路由的变量,但它只适用于我使用CELERY_TASK_ROUTES.任务路由似乎独立于Celery Beat,因此这仅适用于手动安排的任务:

app1_test.delay()
app2_test.delay()
Run Code Online (Sandbox Code Playgroud)

要么

app1_test.apply_async()
app2_test.apply_async()
Run Code Online (Sandbox Code Playgroud)

为了使它与Celery Beat一起使用,我们只需要明确定义队列.文件my_app/settings.py的最终设置如下:

CELERY_BROKER_URL = "amqp://guest:guest@localhost:5672//"
CELERY_TASK_ROUTES = {
 'app1.tasks.*': {'queue': 'queue1'},
 'app2.tasks.*': {'queue': 'queue2'},
}
CELERY_BEAT_SCHEDULE = {
    'app1_test': {
        'task': 'app1.tasks.app1_test',
        'schedule': 15,
        'options': {'queue': 'queue1'}
    },
    'app2_test': {
        'task': 'app2.tasks.app2_test',
        'schedule': 15,
        'options': {'queue': 'queue2'}
    },

}
Run Code Online (Sandbox Code Playgroud)

我希望这可以为其他开发人员节省一些时间.

  • "CELERY_ROUTES"与"CELERY_TASK_ROUTES"混淆的解释:`CELERY_ROUTES`是旧的芹菜设置名称,现在已被`task_routes`取代.但是_django_设置文件中的_celery_设置必须是大写的(例如`TASK_ROUTES`).为了避免与其他django设置冲突,建议使用`CELERY_`为celery设置添加前缀,从而产生`CELERY_TASK_ROUTES`.通过执行以下操作来加载:`app.config_from_object('django.conf:settings',namespace ='CELERY')`.因此`CELERY_TASK_ROUTES`只是新设置名称的上限和前缀更改. (3认同)
  • 这在 celery v5.0 中也运行良好,谢谢 (2认同)

JPG*_*JPG 7

queue装饰器添加参数可能会帮助您,

@app.task(queue='queue1')
def app1_test():
    print('I am app1_test task!')
    time.sleep(10)
Run Code Online (Sandbox Code Playgroud)

  • 我放弃了配置选项“celery_task_default_queue”、“task_default_queue”、“task_routes”。这些都没有起到任何作用。唯一有用的是装饰器,谢谢 JPG。 (5认同)
  • 感谢@JPG,这将是一个有效的选择,尽管如此,我还是更喜欢在Django设置文件上定义队列,以获得更大的灵活性。这样,我可以根据环境使用不同的队列名称:测试,分段,生产 (2认同)