nab*_*zan 178 task rabbitmq celery celery-task
如何在不知道task_id
每个任务的情况下删除所有待处理任务?
Phi*_*ham 269
来自文档:
$ celery -A proj purge
Run Code Online (Sandbox Code Playgroud)
要么
from proj.celery import app
app.control.purge()
Run Code Online (Sandbox Code Playgroud)
(编辑:用当前方法更新.)
Too*_*ink 116
对于芹菜3.0+:
$ celery purge
Run Code Online (Sandbox Code Playgroud)
要清除特定队列:
$ celery -Q queue_name purge
Run Code Online (Sandbox Code Playgroud)
smi*_*ido 24
例如,使用带-Q参数的worker来定义队列时
celery worker -Q queue1,queue2,queue3
Run Code Online (Sandbox Code Playgroud)
然后celery purge
将无法工作,因为你无法将队列params传递给它.它只会删除默认队列.解决方案是使用以下--purge
参数启动您的worker :
celery worker -Q queue1,queue2,queue3 --purge
Run Code Online (Sandbox Code Playgroud)
然而,这将运行工人.
其他选项是使用celery的amqp子命令
celery amqp queue.delete queue1
celery amqp queue.delete queue2
celery amqp queue.delete queue3
Run Code Online (Sandbox Code Playgroud)
ksi*_*ndi 11
在芹菜3+:
CLI:
$ celery -A proj purge
Run Code Online (Sandbox Code Playgroud)
编程方式:
>>> from proj.celery import app
>>> app.control.purge()
Run Code Online (Sandbox Code Playgroud)
http://docs.celeryproject.org/en/latest/faq.html#how-do-i-purge-all-waiting-tasks
我发现这celery purge
对我更复杂的芹菜配置不起作用.我为不同的目的使用多个命名队列:
$ sudo rabbitmqctl list_queues -p celery name messages consumers
Listing queues ... # Output sorted, whitespaced for readability
celery 0 2
celery@web01.celery.pidbox 0 1
celery@web02.celery.pidbox 0 1
apns 0 1
apns@web01.celery.pidbox 0 1
analytics 1 1
analytics@web01.celery.pidbox 0 1
bcast.361093f1-de68-46c5-adff-d49ea8f164c0 0 1
bcast.a53632b0-c8b8-46d9-bd59-364afe9998c1 0 1
celeryev.c27b070d-b07e-4e37-9dca-dbb45d03fd54 0 1
celeryev.c66a9bed-84bd-40b0-8fe7-4e4d0c002866 0 1
celeryev.b490f71a-be1a-4cd8-ae17-06a713cc2a99 0 1
celeryev.9d023165-ab4a-42cb-86f8-90294b80bd1e 0 1
Run Code Online (Sandbox Code Playgroud)
第一列是队列名称,第二列是队列中等待的消息数,第三列是该队列的侦听器数.队列是:
分析任务是一项在小型数据集上运行良好的强力任务,但现在需要超过24小时才能处理.偶尔会出现问题,并且会在数据库中等待.它需要重新编写,但在此之前,当它被卡住时我会终止任务,清空队列,然后再试一次.我通过查看分析队列的消息计数来检测"卡住",该消息计数应为0(完成分析)或1(等待昨晚的分析完成).2或更高是坏的,我收到一封电子邮件.
celery purge
提供从其中一个广播队列中删除任务,我没有看到选择其他命名队列的选项.
这是我的过程:
$ sudo /etc/init.d/celeryd stop # Wait for analytics task to be last one, Ctrl-C
$ ps -ef | grep analytics # Get the PID of the worker, not the root PID reported by celery
$ sudo kill <PID>
$ sudo /etc/init.d/celeryd stop # Confim dead
$ python manage.py celery amqp queue.purge analytics
$ sudo rabbitmqctl list_queues -p celery name messages consumers # Confirm messages is 0
$ sudo /etc/init.d/celeryd start
Run Code Online (Sandbox Code Playgroud)
如果您想删除所有挂起的任务以及活动和保留的任务以完全停止 Celery,这对我有用:
from proj.celery import app
from celery.task.control import inspect, revoke
# remove pending tasks
app.control.purge()
# remove active tasks
i = inspect()
jobs = i.active()
for hostname in jobs:
tasks = jobs[hostname]
for task in tasks:
revoke(task['id'], terminate=True)
# remove reserved tasks
jobs = i.reserved()
for hostname in jobs:
tasks = jobs[hostname]
for task in tasks:
revoke(task['id'], terminate=True)
Run Code Online (Sandbox Code Playgroud)
在芹菜3+
http://docs.celeryproject.org/en/3.1/faq.html#how-do-i-purge-all-waiting-tasks
CLI
清除命名队列:
celery -A proj amqp queue.purge <queue name>
Run Code Online (Sandbox Code Playgroud)
清除已配置的队列
celery -A proj purge
Run Code Online (Sandbox Code Playgroud)
我已经清除了消息,但队列中仍然留有消息?答案:任务一实际执行就会被确认(从队列中删除).在工作人员收到任务后,它将需要一些时间才能实际执行,特别是如果有许多任务已经等待执行.未确认的消息由工作人员保留,直到它关闭与代理(AMQP服务器)的连接.当该连接关闭时(例如,因为工作程序已停止),代理将将任务重新发送给下一个可用工作程序(或重新启动时的同一工作程序),以便正确清除等待任务的队列必须停止所有工作者,然后使用celery.control.purge()清除任务.
因此,要清除整个队列,必须停止工作.
对于 Celery 5.0+,从 CLI 执行此操作并针对特定队列:
celery -A APP_NAME purge --queues QUEUE_NAME
-f
如果您想像我一样在步骤中执行此操作,请在末尾添加选项以跳过确认步骤。