如何使用芹菜检索所有挂起和活动的任务

Gre*_*son 5 python celery

我正在尝试从 Celery 中检索所有当前pendingactive任务。

activescheduled并且reserved很容易获得:

i = celery_app.control.inspect()
tasks=list()
for k in i.active():
    tasks += i.active()[k]
for k in i.reserved():
    tasks += i.reserved()[k]
for k in i.scheduled():
    tasks += i.scheduled()[k]
Run Code Online (Sandbox Code Playgroud)

但是,这不包括尚未完成scheduledreserved尚未完成的任务。以下代码检索这些任务,但显然仅适用于 Redis,假设是 JSON 序列化,并且由于各种原因并不理想。

r = redis.StrictRedis(
        host='localhost',
        port='6379',
    )
tasks = [json.loads(pending_task)['headers'] for pending_task in r.lrange('celery', 0, -1)]
Run Code Online (Sandbox Code Playgroud)

有没有办法以pending类似于inspect.active()易于访问的形式检索这些任务args, kwargs, id(最好通过 Celery,所以我不需要指定 Celery 后端和序列化等)?

编辑(对冲我的赌注)

据我所知,这并不简单,i.pending()芹菜中也没有。所以,我有一些子问题可以让我自己写这个。

  1. 如何获取活动 celery 队列的列表?(以上仅假设默认队列)也许像queues = [q['exchange']['name'] for host in i.active_queues().values() for q in host])
  2. 如何一般地获取这些队列中的消息?这将是上述的通用版本r.lrange('celery', 0, -1)
  3. 如何一般地解码/反序列化消息?这将是一个通用版本json.loads(...) above celery_app.backend.decode

有了这些信息,从消息头中提取必要的信息应该很简单。