我正在使用芹菜(并发池的并发性= 1),我希望能够在特定任务运行后关闭工作人员.需要注意的是,我希望避免工人在那之后接受任何进一步任务的任何可能性.
这是我在大纲中的尝试:
from __future__ import absolute_import, unicode_literals
from celery import Celery
from celery.exceptions import WorkerShutdown
from celery.signals import task_postrun
app = Celery()
app.config_from_object('celeryconfig')
@app.task
def add(x, y):
return x + y
@task_postrun.connect(sender=add)
def shutdown(*args, **kwargs):
raise WorkerShutdown()
Run Code Online (Sandbox Code Playgroud)
但是,当我运行工人时
celery -A celeryapp worker --concurrency=1 --pool=solo
Run Code Online (Sandbox Code Playgroud)
并运行任务
add.delay(1,4)
Run Code Online (Sandbox Code Playgroud)
我得到以下内容:
-------------- celery@sam-APOLLO-2000 v4.0.2 (latentcall)
---- **** -----
--- * *** * -- Linux-4.4.0-116-generic-x86_64-with-Ubuntu-16.04-xenial 2018-03-18 14:08:37
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app: __main__:0x7f596896ce90
- ** ---------- .> transport: redis://localhost:6379/0
- ** ---------- .> results: redis://localhost/
- *** --- * --- .> concurrency: 4 (solo)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery
[2018-03-18 14:08:39,892: WARNING/MainProcess] Restoring 1 unacknowledged message(s)
Run Code Online (Sandbox Code Playgroud)
该任务重新排队,并将在另一个工作人员上再次运行,从而导致循环.
当我WorkerShutdown在任务本身内移动异常时也会发生这种情况.
@app.task
def add(x, y):
print(x + y)
raise WorkerShutdown()
Run Code Online (Sandbox Code Playgroud)
有没有办法可以在特定任务后关闭工人,同时避免这种不幸的副作用?
建议的关闭工作者的过程是发送TERM信号。这将导致芹菜工作者在完成任何当前正在运行的任务后关闭。如果您向QUIT工作人员的主流程发送信号,工作人员将立即关闭。
但是,celery文档通常从命令行或通过systemd / initd管理celery方面进行讨论,但是celery还通过提供了远程工作者控制API celery.app.control。
您可以撤消任务以防止工作人员执行任务。这样可以防止您遇到循环。此外,控制也以这种方式支持工人的停工。
因此,我想以下内容将使您获得所需的行为。
@app.task(bind=True)
def shutdown(self):
app.control.revoke(self.id) # prevent this task from being executed again
app.control.shutdown() # send shutdown signal to all workers
Run Code Online (Sandbox Code Playgroud)
由于当前不可能从任务中确认该任务,然后继续执行该任务,因此这种使用方法revoke可以避免此问题,因此,即使再次将任务排队,新工作人员也将简单地忽略它。
或者,以下内容也将阻止重新交付的任务第二次执行...
@app.task(bind=True)
def some_task(self):
if self.request.delivery_info['redelivered']:
raise Ignore() # ignore if this task was redelivered
print('This should only execute on first receipt of task')
Run Code Online (Sandbox Code Playgroud)
还值得注意的是AsyncResult还有一种revoke方法可以self.app.control.revoke为您服务。
如果您关闭工作线程,则任务完成后,它不会再次重新排队。
@task_postrun.connect(sender=add)
def shutdown(*args, **kwargs):
app.control.broadcast('shutdown')
Run Code Online (Sandbox Code Playgroud)
这将在任务完成后正常关闭工作线程。
[2018-04-01 18:44:14,627: INFO/MainProcess] Connected to redis://localhost:6379/0
[2018-04-01 18:44:14,656: INFO/MainProcess] mingle: searching for neighbors
[2018-04-01 18:44:15,719: INFO/MainProcess] mingle: all alone
[2018-04-01 18:44:15,742: INFO/MainProcess] celery@foo ready.
[2018-04-01 18:46:28,572: INFO/MainProcess] Received task: celery_worker_stop.add[ac8a65ff-5aad-41a6-a2d6-a659d021fb9b]
[2018-04-01 18:46:28,585: INFO/ForkPoolWorker-4] Task celery_worker_stop.add[ac8a65ff-5aad-41a6-a2d6-a659d021fb9b] succeeded in 0.005628278013318777s: 3
[2018-04-01 18:46:28,665: WARNING/MainProcess] Got shutdown from remote
Run Code Online (Sandbox Code Playgroud)
注意:广播将关闭所有工作人员。如果您想关闭特定的工作程序,请使用名称启动工作程序
celery -A celeryapp worker -n self_killing --concurrency=1 --pool=solo
Run Code Online (Sandbox Code Playgroud)
现在您可以使用目标参数关闭它。
app.control.broadcast('shutdown', destination=['celery@self_killing'])
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
3092 次 |
| 最近记录: |