从芹菜任务接收事件

Glu*_*eon 5 django celery

我有一个长期运行的芹菜任务,它迭代一系列项目并执行一些操作.

任务应该以某种方式报告它当前处理的项目,以便最终用户知道任务的进度.

目前我的django应用程序和芹菜一起安装在一台服务器上,所以我可以使用Django的模型来报告状态,但我计划添加更多远离Django的工作人员,因此他们无法访问数据库.

现在我看到几个解决方案:

  • 使用某些存储手动存储中间结果,例如redis或mongodb,然后通过网络提供.这让我有点担心,因为如果我将使用redis然后我应该保持同步Django端的代码读取状态和Celery任务写入状态,所以他们使用相同的键.
  • 使用REST调用从芹菜返回Django的状态.喜欢PUT http://django.com/api/task/123/items_processed
  • 也许使用Celery事件系统并创建Item processeddjango更新计数器的事件
  • 创建一个单独的worker,它在一个带有django的服务器上运行,它拥有一个只增加items proceeded计数的任务,所以当任务完成时它会发出一个项目increase_messages_proceeded_count.delay(task_id).

我提到的那些是否有任何解决方案或隐藏的问题?

leh*_*ins 7

可能有很多方法可以实现您的目标,但这就是我将如何做到这一点.

在长期运行的芹菜任务中,使用django的缓存框架设置进度:

from django.core.cache import cache

@app.task()
def long_running_task(self, *args, **kwargs):
    key = "my_task: %s" % self.result.id
    ...
    # do whatever you need to do and set the progress
    # using cache:
    cache.set(key, progress, timeout="whatever works for you")
    ...
Run Code Online (Sandbox Code Playgroud)

然后,您所要做的就是使用该密钥生成重复的AJAX GET请求,并从缓存中检索进度.这些方面的东西:

 def task_progress_view(request, *args, **kwargs):
     key = request.GET.get('task_key')
     progress = cache.get(key)
     return HttpResponse(content=json.dumps({'progress': progress}),
                         content_type="application/json; charset=utf-8")
Run Code Online (Sandbox Code Playgroud)

下面是一个警告,如果您将服务器作为多个进程运行,请确保使用memcached之类的东西,因为django的本机缓存在进程之间会不一致.此外,我可能不会使用芹菜task_id作为钥匙,但它足以用于演示目的.


scy*_*ale 2

最简单:

您的任务和 django 应用程序已经共享访问一两个数据存储 - 代理和结果后端(如果您使用的数据存储与代理不同)

您可以简单地将一些数据放入这些数据存储中的一个或另一个中,以指示任务当前正在处理哪一项。

例如,如果使用 redis 只是有一个键“当前正在处理的任务”,并将与当前正在处理的项目相关的数据存储在其中。