Celery+Django -- 使用 Django 消息框架对状态进行轮询任务并报告成功或失败

Dan*_*n K 5 javascript python django jquery celery

在我使用 Celery 的 Django 项目(以及其他一些东西)中,我有一个 Celery 任务,它将在后台将文件上传到数据库。我使用轮询来跟踪上传进度并显示上传进度条。以下是一些详细说明上传过程的片段:

视图.py:

from .tasks import upload_task
...

upload_task.delay(datapoints, user, description) # datapoints is a list of dictionaries, user and description are simple strings
Run Code Online (Sandbox Code Playgroud)

任务.py:

from taskman.celery import app, DBTask # taskman is the name of the Django app that has celery.py
from celery import task, current_task

@task(base=DBTask)
def upload_task(datapoints, user, description):
    from utils.db.databaseinserter import insertIntoDatabase
    for count in insertIntoDatabase(datapoints, user, description):
        percent_completion = int(100 * (float(count) / float(len(datapoints))))
        current_task.update_state(state='PROGRESS', meta={'percent':percent_completion})
Run Code Online (Sandbox Code Playgroud)

数据库插入器.py:

def insertIntoDatabase(datapoints, user, description):
    # iterate through the datapoints and upload them one by one
    # at the end of an iteration, yield the number of datapoints completed so far
Run Code Online (Sandbox Code Playgroud)

上传代码一切正常,进度条也正常。但是,我不确定如何发送一条 Django 消息告诉用户上传已完成(或者,如果发生错误,发送一条 Django 消息通知用户错误)。上传开始时,我在 views.py 中执行此操作:

from django.contrib import messages
...

messages.info(request, "Upload is in progress")
Run Code Online (Sandbox Code Playgroud)

当上传成功时,我想做这样的事情:

messages.info(request, "Upload successful!")
Run Code Online (Sandbox Code Playgroud)

我不能在 views.py 中做到这一点,因为 Celery 任务是一劳永逸的。有没有办法在 celery.py 中做到这一点?在我DBTask的 celery.py 类中,我已经定义on_successon_failure定义了,所以我可以从那里发送 Django 消息吗?

此外,虽然我的投票在技术上有效,但目前并不理想。轮询目前的工作方式是,无论任务是否正在进行,它都会无休止地检查任务。它很快淹没了服务器控制台日志,我可以想象它会对整体性能产生负面影响。我对编写轮询代码还很陌生,所以我不完全确定最佳实践,以及如何仅在需要时进行轮询。处理不断轮询和服务器日志堵塞的最佳方法是什么?下面是我的投票代码。

视图.py:

def poll_state(request):
    data = 'Failure'
    if request.is_ajax():
        if 'task_id' in request.POST.keys() and request.POST['task_id']:
            task_id = request.POST['task_id']
            task = AsyncResult(task_id)
            data = task.result or task.state
            if data == 'SUCCESS' or data == 'FAILURE': # not sure what to do here; what I want is to exit the function early if the current task is already completed
                return HttpResponse({}, content_type='application/json')
        else:
            data ='No task_id in the request'
            logger.info('No task_id in the request')
    else:
        data = 'Not an ajax request'
        logger.info('Not an ajax request')

    json_data = json.dumps(data)
    return HttpResponse(json_data, content_type='application/json')
Run Code Online (Sandbox Code Playgroud)

以及相应的jQuery代码:

{% if task_id %}
    jQuery(document).ready(function() {
        var PollState = function(task_id) {
            jQuery.ajax({
                url: "poll_state",
                type: "POST",
                data: "task_id=" + task_id,
            }).done(function(task) {
                if (task.percent) {
                    jQuery('.bar').css({'width': task.percent + '%'});
                    jQuery('.bar').html(task.percent + '%');
                }
                else {
                    jQuery('.status').html(task);
                };
                PollState(task_id);
            });
        }
        PollState('{{ task_id }}');
    })
{% endif %}
Run Code Online (Sandbox Code Playgroud)

(最后两个片段主要来自之前关于 Django+Celery 进度条的 StackOverflow 问题。)

Dan*_*n K 0

感谢 Josh K 提供的使用技巧setTimeout。不幸的是,我永远无法弄清楚中间件方法,因此我将采用一种更简单的方法来发送 HttpResponse,如下所示poll_state

if data == "SUCCESS":
    return HttpResponse(json.dumps({"message":"Upload successful!", "state":"SUCCESS"}, content_type='application/json'))
elif data == "FAILURE":
    return HttpResponse(json.dumps({"message":"Error in upload", "state":"FAILURE"}, content_type='application/json'))
Run Code Online (Sandbox Code Playgroud)

目的是根据收到的 JSON 简单地呈现成功或错误消息。现在出现了新问题,但这些是针对不同问题的。