芹菜任务 - 如何获得自定义状态?

Jas*_*son 6 python task celery celery-task

我有许多Celery任务,它们是长期运行的进程.因此,我想实现自定义状态以查询其进度.

根据文档,为给定​​任务实现自定义状态很容易.

def download_count(wget_base_path):
    # recursively traverse root folder and return count of files
    return sum([len(files) for r, d, files in os.walk(wget_base_path)])

@app.task(bind = True)
def html_download(self, url='', cl_id=-1):

    log = get_logger(__name__)
    ...
    # wget download location
    wget_base_path = settings.WGET_PATH + str(cl_id) 

    os.system(wget_cmd)

    if not self.request.called_directly:
        log.debug('State progress called')
        self.update_state(state = 'PROGRESS', meta = {'item_count' : download_count(wget_base_path)})
Run Code Online (Sandbox Code Playgroud)

现在,当我打电话给这个时

from app.ingest.tasks import html

ingest = html.html_download.delay(url, 54431)
Run Code Online (Sandbox Code Playgroud)

这项工作按预期开始.但是每当我尝试获得更新状态时,我都不会获得任何元数据.

例如,

In [6]: ingest.state
Out[6]: 'PENDING'


In [10]: ingest._get_task_meta()
Out[10]: {'result': None, 'status': 'PENDING'}
Run Code Online (Sandbox Code Playgroud)

是否可以阻止wget命令的os.system调用阻塞所有内容?如果我使用subprocess,则在chold进程执行时任务完成得非常快.

小智 1

Celery 将结果状态存储在后端。默认情况下,如果配置了后端并且 celery 找不到有关作为输入提供的 task_id 的任何详细信息,则会发送PENDING状态作为响应。针对目前的情况,我想到两种可能性:

  1. 该任务实际上可能正在处理中,尚未到达更新部分。
  2. Celery 具有task_result_expires配置值,该值会在该时间间隔后删除任务元数据。因此,此后进行的任何查询都会导致PENDING状态。