如何在Celery中检查任务状态?

Mar*_*cin 85 python web-services celery django-celery

如何检查任务是否在芹菜中运行(具体来说,我使用的是celery-django)?

我已经阅读了文档,而且我用谷歌搜索了,但我看不到像这样的电话:

my_example_task.state() == RUNNING
Run Code Online (Sandbox Code Playgroud)

我的用例是我有一个用于转码的外部(java)服务.当我发送要转码的文档时,我想检查运行该服务的任务是否正在运行,如果没有,则(重新)启动它.

我正在使用目前的稳定版本 - 2.4,我相信.

Gre*_*gor 84

返回task_id(由.delay()提供),然后向celery实例询问状态:

x = method.delay(1,2)
print x.task_id
Run Code Online (Sandbox Code Playgroud)

在询问时,使用此task_id获取新的AsyncResult:

from celery.result import AsyncResult
res = AsyncResult("your-task-id")
res.ready()
Run Code Online (Sandbox Code Playgroud)

  • 谢谢,但如果我无法访问`x`怎么办? (7认同)
  • 你把你的工作排到芹菜的哪个地方?在那里你必须返回task_id来跟踪将来的工作. (4认同)
  • @Chris @gregor代码的争议在于`async_result`的实例化.在您的用例中,您已经拥有该实例,您很高兴.但是如果你只有task ID,并且需要实例化一个`async_result`实例才能调用`async_result.get()`会发生什么?这是`AsyncResult`类的一个实例,但你不能使用原始类`celery.result.AsyncResult`,你需要从`app.task()`包装的函数中获取类.在你的情况下你会做`async_result = run_instance.AsyncResult('task-id')` (2认同)

Mar*_*cin 57

每个Task对象都有一个.request属性,它包含它的AsyncRequest对象.因此,以下行给出了任务的状态task:

task.AsyncResult(task.request.id).state
Run Code Online (Sandbox Code Playgroud)

  • 当我这样做时,我得到一个永久的PENDING AsyncResult,即使我等待足够长的任务完成.有没有办法让这看到状态的变化?我相信我的后端已配置,我尝试设置CELERY_TRACK_STARTED = True无效. (4认同)
  • 有没有办法存储任务的进度百分比? (2认同)

Lou*_*uis 56

AsyncResult从任务ID 创建对象FAQ中建议的方式,以便在您拥有的唯一内容是任务ID时获取任务状态.

然而,就Celery 3.x而言,如果他们不注意它们,可能会有一些重要的警告.这实际上取决于具体的用例场景.

默认情况下,Celery不会记录"正在运行"状态.

为了让Celery记录任务正在运行,您必须设置task_track_startedTrue.这是一个测试这个的简单任务:

@app.task(bind=True)
def test(self):
    print self.AsyncResult(self.request.id).state
Run Code Online (Sandbox Code Playgroud)

如果task_track_startedFalse,默认情况下,状态显示PENDING即使任务已启动.如果设置task_track_startedTrue,则状态为STARTED.

国家的PENDING意思是"我不知道".

一个AsyncResult与状态PENDING并不意味着什么比这更芹菜不知道任务的状态.这可能是由于许多原因造成的.

首先,AsyncResult可以使用无效的任务ID构建.这些"任务"将被Celery视为未决:

>>> task.AsyncResult("invalid").status
'PENDING'
Run Code Online (Sandbox Code Playgroud)

好的,所以没有人会明显地提供无效的ID AsyncResult.很公平,但它也有效,AsyncResult也会考虑一个成功运行的任务,但Celery已经忘记了PENDING.同样,在某些用例场景中,这可能是一个问题.部分问题取决于Celery如何配置以保持任务结果,因为它取决于结果后端中"墓碑"的可用性.("Tombstones"是Celery文档中用于记录任务结束方式的数据块的术语.)AsyncResult如果task_ignore_result是,则使用将无法正常工作True.一个更令人烦恼的问题是Celery默认会使墓碑过期.该result_expires默认设置为24小时.因此,如果您启动一项任务,并在长期存储中记录该ID,并在24小时后再创建一个AsyncResult,则使用它创建一个状态PENDING.

所有"真正的任务"都从该PENDING州开始.因此PENDING,执行任务可能意味着任务被请求但从未进一步发展(无论出于何种原因).或者它可能意味着任务已经完成,但Celery忘记了它的状态.

哎哟! AsyncResult对我不起作用.我还可以做些什么?

我更愿意跟踪目标,而不是跟踪任务本身.我确实保留了一些任务信息,但它确实是继续跟踪目标.目标存储在独立于Celery的存储中.当请求需要执行计算取决于已经实现的某个目标时,它会检查目标是否已经实现,如果是,则使用此缓存目标,否则它将启动将影响目标的任务,并发送到使HTTP请求成为响应的客户端,指示它应该等待结果.


上面的变量名称和超链接适用于Celery 4.x. 在3.x中相应的变量和超链接是:CELERY_TRACK_STARTED,CELERY_IGNORE_RESULT,CELERY_TASK_RESULT_EXPIRES.


msa*_*gel 15

您还可以创建自定义状态并更新其值duting任务执行.这个例子来自docs:

@app.task(bind=True)
def upload_files(self, filenames):
    for i, file in enumerate(filenames):
        if not self.request.called_directly:
            self.update_state(state='PROGRESS',
                meta={'current': i, 'total': len(filenames)})
Run Code Online (Sandbox Code Playgroud)

http://celery.readthedocs.org/en/latest/userguide/tasks.html#custom-states


小智 9

老问题,但我最近遇到了这个问题.

如果您正在尝试获取task_id,可以这样做:

import celery
from celery_app import add
from celery import uuid

task_id = uuid()
result = add.apply_async((2, 2), task_id=task_id)
Run Code Online (Sandbox Code Playgroud)

现在您确切知道task_id是什么,现在可以使用它来获取AsyncResult:

# grab the AsyncResult 
result = celery.result.AsyncResult(task_id)

# print the task id
print result.task_id
09dad9cf-c9fa-4aee-933f-ff54dae39bdf

# print the AsyncResult's status
print result.status
SUCCESS

# print the result returned 
print result.result
4
Run Code Online (Sandbox Code Playgroud)

  • 绝对不需要创建自己的任务ID并将其传递给`apply_async`.`apply_async`返回的对象是一个'AsyncResult`对象,它具有Celery生成的任务的id. (3认同)

小智 9

只需使用芹菜常见问题解答中的此 API

result = app.AsyncResult(task_id)
Run Code Online (Sandbox Code Playgroud)

这工作正常。


Adr*_*eno 6

2020年答案:

#### tasks.py
@celery.task()
def mytask(arg1):
    print(arg1)

#### blueprint.py
@bp.route("/args/arg1=<arg1>")
def sleeper(arg1):
    process = mytask.apply_async(args=(arg1,)) #mytask.delay(arg1)
    state = process.state
    return f"Thanks for your patience, your job {process.task_id} \
             is being processed. Status {state}"
Run Code Online (Sandbox Code Playgroud)