Jas*_*son 6 python task celery celery-task
我有许多Celery任务,它们是长期运行的进程.因此,我想实现自定义状态以查询其进度.
根据文档,为给定任务实现自定义状态很容易.
def download_count(wget_base_path):
# recursively traverse root folder and return count of files
return sum([len(files) for r, d, files in os.walk(wget_base_path)])
@app.task(bind = True)
def html_download(self, url='', cl_id=-1):
log = get_logger(__name__)
...
# wget download location
wget_base_path = settings.WGET_PATH + str(cl_id)
os.system(wget_cmd)
if not self.request.called_directly:
log.debug('State progress called')
self.update_state(state = 'PROGRESS', meta = {'item_count' : download_count(wget_base_path)})
Run Code Online (Sandbox Code Playgroud)
现在,当我打电话给这个时
from app.ingest.tasks import html
ingest = html.html_download.delay(url, 54431)
Run Code Online (Sandbox Code Playgroud)
这项工作按预期开始.但是每当我尝试获得更新状态时,我都不会获得任何元数据.
例如,
In [6]: ingest.state
Out[6]: 'PENDING'
In [10]: ingest._get_task_meta()
Out[10]: {'result': None, 'status': 'PENDING'}
Run Code Online (Sandbox Code Playgroud)
是否可以阻止wget命令的os.system调用阻塞所有内容?如果我使用subprocess,则在chold进程执行时任务完成得非常快.
小智 1
Celery 将结果状态存储在后端。默认情况下,如果配置了后端并且 celery 找不到有关作为输入提供的 task_id 的任何详细信息,则会发送PENDING状态作为响应。针对目前的情况,我想到两种可能性:
| 归档时间: |
|
| 查看次数: |
483 次 |
| 最近记录: |