我正在尝试从 Celery 中检索所有当前pending和active任务。
active,scheduled并且reserved很容易获得:
i = celery_app.control.inspect()
tasks=list()
for k in i.active():
tasks += i.active()[k]
for k in i.reserved():
tasks += i.reserved()[k]
for k in i.scheduled():
tasks += i.scheduled()[k]
Run Code Online (Sandbox Code Playgroud)
但是,这不包括尚未完成scheduled或reserved尚未完成的任务。以下代码检索这些任务,但显然仅适用于 Redis,假设是 JSON 序列化,并且由于各种原因并不理想。
r = redis.StrictRedis(
host='localhost',
port='6379',
)
tasks = [json.loads(pending_task)['headers'] for pending_task in r.lrange('celery', 0, -1)]
Run Code Online (Sandbox Code Playgroud)
有没有办法以pending类似于inspect.active()易于访问的形式检索这些任务args, kwargs, id(最好通过 Celery,所以我不需要指定 Celery 后端和序列化等)?
编辑(对冲我的赌注)
据我所知,这并不简单,i.pending()芹菜中也没有。所以,我有一些子问题可以让我自己写这个。
queues = [q['exchange']['name'] for host in i.active_queues().values() for q in host])r.lrange('celery', 0, -1)。json.loads(...) abovecelery_app.backend.decode有了这些信息,从消息头中提取必要的信息应该很简单。