celery - 需要优先运行的任务

fab*_*ols 16 django queue celery

在我的网站中,用户可以在每次需要时更新他们的个人资料(手动),或者每天自动更新一次.

现在正在与芹菜一起分发这项任务.

但我有一个"问题":

每天,在自动更新中,作业将所有用户(+ -6k用户)放在队列中:

from celery import group
from tasks import *
import datetime
from lastActivityDate.models import UserActivity

today   = datetime.datetime.today()
one_day = datetime.timedelta(days=5)
today -= one_day

print datetime.datetime.today()

user_list = UserActivity.objects.filter(last_activity_date__gte=today)
g = group(update_user_profile.s(i.user.auth.username) for i in user_list)

print datetime.datetime.today()
print g(user_list.count()).get()
Run Code Online (Sandbox Code Playgroud)

如果有人尝试进行手动更新,他们将进入队列并永远执行.

有没有办法将此手动任务设置为以piority方式运行?或专门为每个分离的队列:手动和自动?

小智 31

Celery不支持任务优先级.(V3.0)

http://docs.celeryproject.org/en/master/faq.html#does-celery-support-task-priorities

您可以通过路由任务来解决此问题.

http://docs.celeryproject.org/en/latest/userguide/routing.html

准备默认和priority_high队列.

CELERY_DEFAULT_QUEUE = 'default'
CELERY_QUEUES = (
    Queue('default'),
    Queue('priority_high'),
)
Run Code Online (Sandbox Code Playgroud)

运行两个守护进程.

user@x:/$ celery worker -Q priority_high
user@y:/$ celery worker -Q default,priority_high
Run Code Online (Sandbox Code Playgroud)

和路线任务.

your_task.apply_async(args=['...'], queue='priority_high')
Run Code Online (Sandbox Code Playgroud)

  • 实际链接 - http://docs.celeryproject.org/en/latest/userguide/routing.html#routing-options-rabbitmq-priorities (5认同)
  • 对于迟到这个答案的人,Celery现在通过RabbitMQ和Redis在不同程度上支持任务优先级 (5认同)
  • 对于任何迟到的人(像我一样); 至关重要的是要注意两个芹菜工作者在不同的主机上运行 - 即两个服务器正在使用priority_high队列,一个服务器正在消耗默认值 (2认同)
  • 现在您还可以使用[消息优先级](http://docs.celeryproject.org/en/latest/whatsnew-3.0.html#redis-priority-support) (2认同)
  • 给遇到这个答案的任何人。队列来自kombu包而不是队列包 (2认同)
  • 为什么在上面的任务路由设置示例中使高优先级队列具有高优先级?事实上,它由比默认队列更多的工作人员提供服务?我问的原因是我只有一台主机来运行一个或多个芹菜工作人员,我想优先考虑一些快速任务,但我不知道该怎么做。 (2认同)

And*_* St 21

如果您使用RabbitMQ传输,则按以下方式配置队列: settings.py

from kombu import Queue
...
CELERY_TASK_QUEUES = (
    Queue('default', routing_key='task_default.#', max_priority=10), 
    ...)
Run Code Online (Sandbox Code Playgroud)

然后运行您的任务:

my_low_prio_task.apply_async(args=(...), priority=1)
my_high_prio_task.apply_async(args=(...), priority=10)
Run Code Online (Sandbox Code Playgroud)

目前这段代码适用于 kombu==4.6.11,celery==4.4.6。

  • 正确答案。对于其他想知道的人来说,10 是最高优先级,1 是最低优先级。 (4认同)