Django/Celery在localhost上有多个队列 - 路由无法正常工作

Nea*_*ara 11 python django celery celerybeat

我跟着celery docs在我的开发机器上定义了2个队列.

我的芹菜设置:

CELERY_ALWAYS_EAGER = True
CELERY_TASK_RESULT_EXPIRES = 60  # 1 mins
CELERYD_CONCURRENCY = 2
CELERYD_MAX_TASKS_PER_CHILD = 4
CELERYD_PREFETCH_MULTIPLIER = 1
CELERY_CREATE_MISSING_QUEUES = True
CELERY_QUEUES = (
    Queue('default', Exchange('default'), routing_key='default'),
    Queue('feeds', Exchange('feeds'), routing_key='arena.social.tasks.#'),
)
CELERY_ROUTES = {
    'arena.social.tasks.Update': {
        'queue': 'fs_feeds',
    },
}
Run Code Online (Sandbox Code Playgroud)

我在我的项目的virtualenv中打开了两个终端窗口,并运行以下命令:

terminal_1$ celery -A arena worker -Q default -B -l debug --purge -n deafult_worker
terminal_2$ celery -A arena worker -Q feeds -B -l debug --purge -n feeds_worker
Run Code Online (Sandbox Code Playgroud)

我得到的是所有任务都由两个队列处理.

我的目标是让一个队列只处理在其中定义的一个任务CELERY_ROUTES和默认队列来处理所有其他任务.

我也按照这个SO问题,rabbitmqctl list_queues返回celery 0并运行两次rabbitmqctl list_bindings返回exchange celery queue celery [].重启兔子服务器没有改变任何东西.

Nea*_*ara 23

好的,所以我想出来了.以下是我的整个设置,设置以及如何运行芹菜,对于那些可能想知道与我的问题一样的事情的人.

设置

CELERY_TIMEZONE = TIME_ZONE
CELERY_ACCEPT_CONTENT = ['json', 'pickle']
CELERYD_CONCURRENCY = 2
CELERYD_MAX_TASKS_PER_CHILD = 4
CELERYD_PREFETCH_MULTIPLIER = 1

# celery queues setup
CELERY_DEFAULT_QUEUE = 'default'
CELERY_DEFAULT_EXCHANGE_TYPE = 'topic'
CELERY_DEFAULT_ROUTING_KEY = 'default'
CELERY_QUEUES = (
    Queue('default', Exchange('default'), routing_key='default'),
    Queue('feeds', Exchange('feeds'), routing_key='long_tasks'),
)
CELERY_ROUTES = {
    'arena.social.tasks.Update': {
        'queue': 'feeds',
        'routing_key': 'long_tasks',
    },
}
Run Code Online (Sandbox Code Playgroud)

如何经营芹菜?

终端 - 标签1:

celery -A proj worker -Q default -l debug -n default_worker
Run Code Online (Sandbox Code Playgroud)

这将启动第一个使用默认队列任务的worker.注意!-n default_worker对于第一个工作人员来说不是必须的,但如果你有任何其他的芹菜实例正在运行,那么这是必须的.设置-n worker_name--hostname=default@%h.相同.

终端 - 标签2:

celery -A proj worker -Q feeds -l debug -n feeds_worker
Run Code Online (Sandbox Code Playgroud)

这将启动第二个工作人员,消费者任务从源队列.请注意-n feeds_worker,如果您正在运行-l debug(log level = debug),您将看到两个工作程序正在它们之间进行同步.

终端 - 标签3:

celery -A proj beat -l debug
Run Code Online (Sandbox Code Playgroud)

这将启动节拍,根据您的计划执行任务CELERYBEAT_SCHEDULE.我没有必要改变任务,或者CELERYBEAT_SCHEDULE.

例如,这是我CELERYBEAT_SCHEDULE应该进入Feed队列的任务的方式:

CELERYBEAT_SCHEDULE = {
    ...
    'update_feeds': {
        'task': 'arena.social.tasks.Update',
        'schedule': crontab(minute='*/6'),
    },
    ...
}
Run Code Online (Sandbox Code Playgroud)

如您所见,无需添加'options': {'routing_key': 'long_tasks'}或指定应该进入的队列.另外,如果你想知道为什么Update是大写的,那是因为它是一个自定义任务,它被定义为子类celery.Task.