检测Celery是否可用/正在运行

Cor*_*ory 48 python django celery django-celery

我正在使用Celery来管理异步任务.然而,偶尔芹菜过程会失效,导致任何任务都无法执行.我希望能够检查芹菜的状态并确保一切正常,如果我发现任何问题,则向用户显示错误消息.从Celery Worker文档看起来我可能能够使用ping或者检查这个,但ping感觉很乱,并且不清楚究竟如何使用inspect(如果inspect().registered()是空的?).

任何有关这方面的指导将不胜感激.基本上我正在寻找的方法是这样的:

def celery_is_alive():
    from celery.task.control import inspect
    return bool(inspect().registered()) # is this right??
Run Code Online (Sandbox Code Playgroud)

编辑:它看起来甚至不像celery 2.3.3上的registered()(即使2.1文档列出它).也许ping是正确的答案.

编辑:Ping似乎也没有做我认为会做的事情,所以仍然不确定这里的答案.

小智 56

这是我一直在使用的代码. celery.task.control.Inspect.stats()返回包含有关当前可用工作程序的大量详细信息的dict,如果没有工作程序正在运行则返回None;如果IOError无法连接到消息代理程序则返回.我正在使用RabbitMQ - 其他消息传递系统可能表现得略有不同.这适用于Celery 2.3.x和2.4.x; 我不确定它会走多远.

def get_celery_worker_status():
    ERROR_KEY = "ERROR"
    try:
        from celery.task.control import inspect
        insp = inspect()
        d = insp.stats()
        if not d:
            d = { ERROR_KEY: 'No running Celery workers were found.' }
    except IOError as e:
        from errno import errorcode
        msg = "Error connecting to the backend: " + str(e)
        if len(e.args) > 0 and errorcode.get(e.args[0]) == 'ECONNREFUSED':
            msg += ' Check that the RabbitMQ server is running.'
        d = { ERROR_KEY: msg }
    except ImportError as e:
        d = { ERROR_KEY: str(e)}
    return d
Run Code Online (Sandbox Code Playgroud)

  • 我发现上面每次运行时都会向rabbitmq添加两个reply.celery.pidbox队列.这导致rabbitmq的内存使用量逐渐增加. (9认同)
  • 这对我不起作用。使用Redis作为代理,当Redis不可用并且没有芹菜工作者在运行时,insp.stats()只会被阻止。 (2认同)

aka*_*hbw 9

如果 celery 作为守护程序运行,要使用命令行检查相同的内容,

  • 激活 virtualenv 并转到“应用程序”所在的目录
  • 现在运行: celery -A [app_name] status
  • 它将显示 celery 是否已启动以及是否已启动。在线节点数

来源:http : //michal.karzynski.pl/blog/2014/05/18/setting-up-an-asynchronous-task-queue-for-django-using-celery-redis/


Ous*_*uss 6

celery 4.2的文档中

from your_celery_app import app


def get_celery_worker_status():
    i = app.control.inspect()
    stats = i.stats()
    registered_tasks = i.registered()
    active_tasks = i.active()
    scheduled_tasks = i.scheduled()
    result = {
        'stats': stats,
        'registered_tasks': registered_tasks,
        'active_tasks': active_tasks,
        'scheduled_tasks': scheduled_tasks
    }
    return result
Run Code Online (Sandbox Code Playgroud)

当然您可以/应该通过错误处理来改善代码...

  • 为了仅仅检查可用性,还有一个i.ping(),它在失败时返回None。 (2认同)
  • 谢谢蒂姆。我在函数中添加了“可用性” (2认同)

Tim*_*all 6

测试是否工作人员响应的一种方法是发送“ping”广播,并在第一次响应时返回成功结果。

from .celery import app  # the celery 'app' created in your project

def is_celery_working():
    result = app.control.broadcast('ping', reply=True, limit=1)
    return bool(result)  # True if at least one result
Run Code Online (Sandbox Code Playgroud)

这会广播“ping”并等待最多一秒钟的响应。一旦第一个响应到来,它就会返回结果。如果您希望False更快地得到结果,可以添加一个timeout参数来减少放弃之前等待的时间。


Ser*_*y K 5

以下对我有用:

import socket
from kombu import Connection

celery_broker_url = "amqp://localhost"

try:
    conn = Connection(celery_broker_url)
    conn.ensure_connection(max_retries=3)
except socket.error:
    raise RuntimeError("Failed to connect to RabbitMQ instance at {}".format(celery_broker_url))
Run Code Online (Sandbox Code Playgroud)

  • 我很确定,如果rabbitmq 正在运行,无论芹菜的状态如何,这都会成功。但是,如果 celery 不知道失败是与 rabbitmq 还是其他原因有关,则这是一个很好的检查。 (4认同)