Dav*_*ain 23 python django celery
我已将长时间运行的任务分段为逻辑子任务,因此我可以在完成时报告每个子任务的结果.但是,我正在尝试报告一个有效永远无法完成的任务的结果(而不是随着时间的推移产生值),并且我正在努力使用现有的解决方案.
我正在为我编写的一些Python程序构建一个Web界面.用户可以通过Web表单提交作业,然后回来查看作业的进度.
假设我有两个函数,每个函数都通过不同的形式访问:
med_func:执行约1分钟,结果传递给render(),产生额外的数据.long_func:返回一个生成器.每个yield大约30分钟,应该报告给用户.收益率如此之多,我们可以将此迭代器视为无限(仅在撤销时终止).有med_func,我报告结果如下:
在表单提交上,我保存AsyncResult到Django会话:
    task_result = med_func.apply_async([form], link=render.s())
    request.session["task_result"] = task_result
Run Code Online (Sandbox Code Playgroud)
结果页面的Django视图访问它AsyncResult.任务完成后,结果将保存到作为上下文传递给Django模板的对象中.
def results(request):
    """ Serve (possibly incomplete) results of a session's latest run. """
    session = request.session
    try:  # Load most recent task
        task_result = session["task_result"]
    except KeyError:  # Already cleared, or doesn't exist
        if "results" not in session:
            session["status"] = "No job submitted"
    else:  # Extract data from Asynchronous Tasks
        session["status"] = task_result.status
        if task_result.ready():
            session["results"] = task_result.get()
            render_task = task_result.children[0]
            # Decorate with rendering results
            session["render_status"] = render_task.status
            if render_task.ready():
                session["results"].render_output = render_task.get()
                del(request.session["task_result"])  # Don't need any more
    return render_to_response('results.html', request.session)
Run Code Online (Sandbox Code Playgroud)
此解决方案仅在函数实际终止时有效.我不能将逻辑子任务链接在一起long_func,因为有一个未知数量的yields(每个迭代的long_func's循环可能不会产生结果).
是否有任何明智的方法可以从极长时间运行的Celery任务中访问产生的对象,以便在生成器耗尽之前显示它们?
小智 23
为了让Celery知道任务的当前状态是什么,它会在您拥有的任何结果后端设置一些元数据.您可以通过它来存储其他类型的元数据.
def yielder():
    for i in range(2**100):
        yield i
@task
def report_progress():
    for progress in yielder():
        # set current progress on the task
        report_progress.backend.mark_as_started(
            report_progress.request.id,
            progress=progress)
def view_function(request):
    task_id = request.session['task_id']
    task = AsyncResult(task_id)
    progress = task.info['progress']
    # do something with your current progress
Run Code Online (Sandbox Code Playgroud)
我不会在那里投入大量数据,但它可以很好地跟踪长期运行任务的进度.
保罗的回答很棒.作为替代使用mark_as_started,你可以使用Task的update_state方法.他们最终做同样的事情,但名称"update_state"更适合你想要做的事情.您可以选择定义一个自定义状态,指示您的任务正在进行中(我将自定义状态命名为"PROGRESS"):
def yielder():
    for i in range(2**100):
        yield i
@task
def report_progress():
    for progress in yielder():
        # set current progress on the task
        report_progress.update_state(state='PROGRESS', meta={'progress': progress})
def view_function(request):
    task_id = request.session['task_id']
    task = AsyncResult(task_id)
    progress = task.info['progress']
    # do something with your current progress
Run Code Online (Sandbox Code Playgroud)
        小智 6
芹菜部分:
def long_func(*args, **kwargs):
    i = 0
    while True:
        yield i
        do_something_here(*args, **kwargs)
        i += 1
@task()
def test_yield_task(task_id=None, **kwargs):
    the_progress = 0        
    for the_progress in long_func(**kwargs):
        cache.set('celery-task-%s' % task_id, the_progress)
Run Code Online (Sandbox Code Playgroud)
Web客户端,启动任务:
r = test_yield_task.apply_async()
request.session['task_id'] = r.task_id
Run Code Online (Sandbox Code Playgroud)
测试最后产生的价值:
   v = cache.get('celery-task-%s' % session.get('task_id'))
   if v:
        do_someting()
Run Code Online (Sandbox Code Playgroud)
如果您不喜欢使用缓存,或者不可能,则可以使用db,file或芹菜工作者和服务器端都具有访问权限的任何其他位置.使用缓存它是一个最简单的解决方案,但工作人员和服务器必须使用相同的缓存.
|   归档时间:  |  
           
  |  
        
|   查看次数:  |  
           7229 次  |  
        
|   最近记录:  |