检索Celery中队列中的任务列表

bra*_*ers 136 python celery

如何检索队列中尚未处理的任务列表?

小智 164

编辑:请参阅其他答案以获取队列中的任务列表.

你应该看看这里: 芹菜指南 - 检查工人

基本上这个:

>>> from celery.task.control import inspect

# Inspect all nodes.
>>> i = inspect()

# Show the items that have an ETA or are scheduled for later processing
>>> i.scheduled()

# Show tasks that are currently active.
>>> i.active()

# Show tasks that have been claimed by workers
>>> i.reserved()
Run Code Online (Sandbox Code Playgroud)

取决于你想要的

  • 这不会返回队列中尚未处理的任务列表. (38认同)
  • 我试过了,但它真的很慢(比如1秒).我在龙卷风应用程序中同步使用它来监视进度,所以它必须很快. (9认同)
  • 使用`i.reserved()`获取排队任务列表. (9认同)
  • 在指定worker时我必须使用list作为参数:`inspect(['celery @ Flatty'])`.比`inspect()`提高了速度. (5认同)
  • 有没有人经历过i.reserved()没有准确的活动任务列表?我有运行的任务没有显示在列表中.我在django-celery上== 3.1.10 (4认同)
  • @JulienFr如果在检查时使用工人的名字,则需要一秒而不是一分钟.即`i = inspect('celery @ mysite')` (4认同)
  • @Banana - 不是'reserved()`只显示工人预取的任务吗?这不会显示整个队列,对吗?如果我禁用了预取怎么办?请参阅:http://docs.celeryproject.org/en/latest/userguide/monitoring.html#commands (4认同)
  • @ Buttons840 - 是的,``reserved()``只显示预取的任务,似乎(即使预取乘数为1).要获取仍在您的代理队列中但尚未由Celery检索的消息的统计信息,您需要使用其他答案中提到的``amqplib``或``rabbitmqctl``技术. (3认同)
  • 这不回答这个问题.我不知道为什么接受这个答案...... :) (3认同)
  • @Seperman:从我目前有限的理解来看,这似乎与 celery 的 `worker_prefetch_multiplier` 设置有关。当我增加队列的并发度时,与使用较低并发度时相比,出现了更多的待处理任务。这似乎符合http://docs.celeryproject.org/en/latest/userguide/optimizing.html#prefetch-limits (2认同)
  • 这曾经在Celery 3. *上对我有用,但在Celery 4. *上不再起作用。即使正在运行的任务很长,它也会返回空列表。 (2认同)
  • 它也为我返回 None (2认同)
  • 这是一个错误的答案。Celery 只报告已分配给工作人员的任务。问题是关于在工作人员领取之前积压的任务。要回答这个问题,您需要检查消息代理(rediis 或rabbitmq) (2认同)

Ali*_*Ali 40

如果您使用的是rabbitMQ,请在终端中使用:

sudo rabbitmqctl list_queues
Run Code Online (Sandbox Code Playgroud)

它将打印具有待处理任务数的队列列表.例如:

Listing queues ...
0b27d8c59fba4974893ec22d478a7093    0
0e0a2da9828a48bc86fe993b210d984f    0
10@torob2.celery.pidbox 0
11926b79e30a4f0a9d95df61b6f402f7    0
15c036ad25884b82839495fb29bd6395    1
celerey_mail_worker@torob2.celery.pidbox    0
celery  166
celeryev.795ec5bb-a919-46a8-80c6-5d91d2fcf2aa   0
celeryev.faa4da32-a225-4f6c-be3b-d8814856d1b6   0
Run Code Online (Sandbox Code Playgroud)

右列中的数字是队列中的任务数.在上面,芹菜队列有166个待处理任务.

  • 当我拥有 sudo 权限时,我对此很熟悉,但我希望非特权的系统用户能够检查 - 有什么建议吗? (2认同)

mli*_*ner 16

如果您不使用优先级任务,如果您使用Redis ,这实际上非常简单.要完成任务计数:

redis-cli -h HOST -p PORT -n DATABASE_NUMBER llen QUEUE_NAME
Run Code Online (Sandbox Code Playgroud)

但是,优先任务在redis中使用不同的键,因此整个画面稍微复杂一些.全貌是您需要为每个任务优先级查询redis.在python(以及Flower项目)中,这看起来像:

PRIORITY_SEP = '\x06\x16'
DEFAULT_PRIORITY_STEPS = [0, 3, 6, 9]


def make_queue_name_for_pri(queue, pri):
    """Make a queue name for redis

    Celery uses PRIORITY_SEP to separate different priorities of tasks into
    different queues in Redis. Each queue-priority combination becomes a key in
    redis with names like:

     - batch1\x06\x163 <-- P3 queue named batch1

    There's more information about this in Github, but it doesn't look like it 
    will change any time soon:

      - https://github.com/celery/kombu/issues/422

    In that ticket the code below, from the Flower project, is referenced:

      - https://github.com/mher/flower/blob/master/flower/utils/broker.py#L135

    :param queue: The name of the queue to make a name for.
    :param pri: The priority to make a name with.
    :return: A name for the queue-priority pair.
    """
    if pri not in DEFAULT_PRIORITY_STEPS:
        raise ValueError('Priority not in priority steps')
    return '{0}{1}{2}'.format(*((queue, PRIORITY_SEP, pri) if pri else
                                (queue, '', '')))


def get_queue_length(queue_name='celery'):
    """Get the number of tasks in a celery queue.

    :param queue_name: The name of the queue you want to inspect.
    :return: the number of items in the queue.
    """
    priority_names = [make_queue_name_for_pri(queue_name, pri) for pri in
                      DEFAULT_PRIORITY_STEPS]
    r = redis.StrictRedis(
        host=settings.REDIS_HOST,
        port=settings.REDIS_PORT,
        db=settings.REDIS_DATABASES['CELERY'],
    )
    return sum([r.llen(x) for x in priority_names])
Run Code Online (Sandbox Code Playgroud)

如果您想获得实际任务,可以使用以下内容:

redis-cli -h HOST -p PORT -n DATABASE_NUMBER lrange QUEUE_NAME 0 -1
Run Code Online (Sandbox Code Playgroud)

从那里你必须反序列化返回的列表.在我的情况下,我能够通过以下方式完成此任务:

r = redis.StrictRedis(
    host=settings.REDIS_HOST,
    port=settings.REDIS_PORT,
    db=settings.REDIS_DATABASES['CELERY'],
)
l = r.lrange('celery', 0, -1)
pickle.loads(base64.decodestring(json.loads(l[0])['body']))
Run Code Online (Sandbox Code Playgroud)

请注意,反序列化可能需要一些时间,您需要调整上面的命令以使用各种优先级.

  • 我已经更新了上面的内容来处理优先任务。进步! (2认同)
  • 需要说明的是,默认使用的“DATABASE_NUMBER”为“0”,“QUEUE_NAME”为“celery”,因此“redis-cli -n 0 llen celery”将返回排队消息的数量。 (2认同)
  • 它总是返回 0。 (2认同)
  • 我在此解决方案中遇到的问题:如果您撤销正在队列中等待的 celery 任务,它会保留在 redis 队列中。lrange 返回的任务数量不正确。 (2认同)

ash*_*ish 11

要从后端检索任务,请使用此方法

from amqplib import client_0_8 as amqp
conn = amqp.Connection(host="localhost:5672 ", userid="guest",
                       password="guest", virtual_host="/", insist=False)
chan = conn.channel()
name, jobs, consumers = chan.queue_declare(queue="queue_name", passive=True)
Run Code Online (Sandbox Code Playgroud)

  • 请参阅 /sf/answers/4046553941/ 获取为您提供任务名称的相关答案。 (3认同)
  • 但是“工作”仅给出队列中的任务数 (2认同)

小智 7

celery 检查模块似乎只从工作人员的角度了解任务。如果你想查看队列中的消息(尚未被工作人员拉取),我建议使用pyrabbit,它可以与rabbitmq http api接口以从队列中检索各种信息。

可以在此处找到示例: 使用 Celery (RabbitMQ、Django) 检索队列长度


Max*_*ysh 6

使用json序列化的Redis复制粘贴解决方案:

def get_celery_queue_items(queue_name):
    import base64
    import json  

    # Get a configured instance of a celery app:
    from yourproject.celery import app as celery_app

    with celery_app.pool.acquire(block=True) as conn:
        tasks = conn.default_channel.client.lrange(queue_name, 0, -1)
        decoded_tasks = []

    for task in tasks:
        j = json.loads(task)
        body = json.loads(base64.b64decode(j['body']))
        decoded_tasks.append(body)

    return decoded_tasks
Run Code Online (Sandbox Code Playgroud)

它适用于Django。只是不要忘记改变yourproject.celery

  • 如果您使用的是 pickle 序列化程序,那么您可以将 `body =` 行更改为 `body = pickle.loads(base64.b64decode(j['body']))`。 (2认同)

om2*_*0de 6

如果您使用的是Celery + Django的最简单方法来检查任务,请直接从虚拟环境中的终端中使用命令或使用celery 的完整路径

Dochttp : //docs.celeryproject.org/en/latest/userguide/workers.html? highlight = revoke#inspecting-workers

$ celery inspect reserved
$ celery inspect inspect
$ celery inspect registered
$ celery inspect scheduled
Run Code Online (Sandbox Code Playgroud)

另外,如果您使用的是Celery + RabbitMQ,则可以使用以下命令检查队列列表

更多信息https : //linux.die.net/man/1/rabbitmqctl

$ sudo rabbitmqctl list_queues
Run Code Online (Sandbox Code Playgroud)

  • 这又不能回答问题。 (11认同)
  • 如果你有一个定义项目,你可以使用`celery -A my_proj检查保留` (4认同)

Cal*_*ing 6

这在我的应用程序中对我有用:

def get_celery_queue_active_jobs(queue_name):
    connection = <CELERY_APP_INSTANCE>.connection()

    try:
        channel = connection.channel()
        name, jobs, consumers = channel.queue_declare(queue=queue_name, passive=True)
        active_jobs = []

        def dump_message(message):
            active_jobs.append(message.properties['application_headers']['task'])

        channel.basic_consume(queue=queue_name, callback=dump_message)

        for job in range(jobs):
            connection.drain_events()

        return active_jobs
    finally:
        connection.close()
Run Code Online (Sandbox Code Playgroud)

active_jobs 将是与队列中的任务相对应的字符串列表。

不要忘记将 CELERY_APP_INSTANCE 换成你自己的。

感谢@ashish 指出我正确的方向,他的回答在这里:https ://stackoverflow.com/a/19465670/9843399

  • @CalebSyring这是第一种真正向我展示排队任务的方法。很不错。对我来说唯一的问题是列表附加似乎不起作用。有什么想法可以让回调函数写入列表吗? (2认同)

Seb*_*ask 5

我认为获取正在等待的任务的唯一方法是保留您启动的任务列表,并让任务在启动时从列表中删除自己。

使用rabbitmqctl和list_queues,您可以了解有多少任务正在等待,但不能了解任务本身:http://www.rabbitmq.com/man/rabbitmqctl.1.man.html

如果您想要的包括正在处理但尚未完成的任务,您可以保留任务列表并检查它们的状态:

from tasks import add
result = add.delay(4, 4)

result.ready() # True if finished
Run Code Online (Sandbox Code Playgroud)

或者你让 Celery 使用 CELERY_RESULT_BACKEND 存储结果,并检查哪些任务不在那里。


Dej*_*kic 5

据我所知,Celery 没有提供用于检查队列中等待的任务的 API。这是经纪人特定的。如果您使用 Redis 作为代理,那么检查在celery(默认)队列中等待的任务就非常简单:

  1. 连接到经纪人
  2. 列出celery列表中的项目(以 LRANGE 命令为例)

请记住,这些任务正在等待可用的工作人员来选择。您的集群可能正在运行一些任务 - 这些任务不会在此列表中,因为它们已被选择。

在特定队列中检索任务的过程是特定于代理的。