celery通过在task_postrun信号中提升SystemExit来尝试关闭工作但是总是挂起并且主进程永远不会退出

Dar*_*ckt 6 python signals systemexit celery

我正试图通过task_postrun信号中的raisin SystemExit()关闭主芹菜过程.信号被触发就好了,异常会被提升,但是工作者永远不会完全退出并挂起.

我该如何工作?

我忘记了某个地方的设置吗?

下面是我正在为worker(worker.py)使用的代码:

from celery import Celery
from celery import signals

app = Celery('tasks',
    set_as_current = True,
    broker='amqp://guest@localhost//',
    backend="mongodb://localhost//",
)

app.config_from_object({
    "CELERYD_MAX_TASKS_PER_CHILD": 1,
    "CELERYD_POOL": "solo",
    "CELERY_SEND_EVENTS": True,
    "CELERYD_CONCURRENCY": 1,
    "CELERYD_PREFETCH_MULTIPLIER": 1,
})

def shutdown_worker(**kwargs):
    print("SHUTTING DOWN WORKER (RAISING SystemExit)")
    raise SystemExit()

import tasks

signals.task_postrun.connect(shutdown_worker)

print("STARTING WORKER")
app.worker_main()
print("WORKER EXITED!")
Run Code Online (Sandbox Code Playgroud)

以下是tasks.py的代码:

from celery import Celery,task
from celery.signals import task_postrun
import time

from celery.task import Task

class Blah(Task):
    track_started = True
    acks_late = False

    def run(config, kwargs):
        time.sleep(5)
        return "SUCCESS FROM BLAH"

def get_app():
    celery = Celery('tasks',
        broker='amqp://guest@localhost//',
        backend="mongodb://localhost//"
    )
    return celery
Run Code Online (Sandbox Code Playgroud)

为了测试这个,我要做的第一件事是运行worker代码(python worker.py),然后我排队一个这样的任务:

>>> import tasks
>>> results = []
>>> results.append(tasks.Blah.delay({}))
Run Code Online (Sandbox Code Playgroud)

我从工人那里看到的输出是这样的:

STARTING WORKER

 -------------- celery@hostname v3.0.9 (Chiastic Slide)
---- **** ----- 
--- * ***  * -- [Configuration]
-- * - **** --- . broker:      amqp://guest@localhost:5672//
- ** ---------- . app:         tasks:0x26f3050
- ** ---------- . concurrency: 1 (solo)
- ** ---------- . events:      ON
- ** ---------- 
- *** --- * --- [Queues]
-- ******* ---- . celery:      exchange:celery(direct) binding:celery
--- ***** ----- 

[2012-11-06 15:59:16,761: WARNING/MainProcess] celery@hostname has started.
[2012-11-06 15:59:21,785: WARNING/MainProcess] SHUTTING DOWN WORKER (RAISING SystemExit)
Run Code Online (Sandbox Code Playgroud)

我期待python代码从调用返回app.worker_main()然后打印WORKER EXITED然后进程完全退出.这种情况从未发生过,并且必须kill -KILL {PID}编写工作进程才能使其消失(它也无法使用任何其他任务).

我想我的主要问题是:

我如何使代码返回app.worker_main()

我希望能够在执行了多个任务后完全重启工作进程(通过让进程完全退出)X.

更新我弄清楚了工作者正在挂起的东西 - 在捕获异常之后,worker(WorkController)挂起了一个调用.self.stopSystemExit

Dar*_*ckt 2

当我最终回答自己的问题时,我讨厌它。

无论如何,它阻塞了对Mediator内部组件的连接调用WorkControllerstop()Mediator组件的调用,内部停止,它join是)。

我通过禁用所有速率限制来摆脱该Mediator组件(默认情况下应该是这样,但并非出于某种原因)。

您可以使用以下设置禁用所有速率限制:

CELERY_DISABLE_ALL_RATE_LIMITS: True
Run Code Online (Sandbox Code Playgroud)

希望这对其他人也有帮助。

和平