芹菜 - 完成任务的召唤功能

ele*_*nor 11 python django rabbitmq celery

我正在使用带有django和rabbitmq的芹菜来创建一个消息队列.我也有一个工人,它来自不同的机器.在django视图中,我正在开始这样的过程:

def processtask(request, name):
  args = ["ls", "-l"]
  MyTask.delay(args)
  return HttpResponse("Task set to execute.")
Run Code Online (Sandbox Code Playgroud)

我的任务配置如下:

class MyTask(Task):
  def run(self, args):
    p = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    (out, err) = p.communicate()
    return out
Run Code Online (Sandbox Code Playgroud)

我现在的问题是,代理(我的django项目)现在如何接收工作人员在其计算机上执行的"ls -l"命令的输出.我想最好的事情是,只要工作人员准备好从执行的命令发送输出,就可以在代理中调用函数.

我想异步接收worker的输出,然后用输出更新网页,但那是另一次.现在我只想收到工人的输出.

更新

现在我添加了一个HTTP GET请求,该请求在任务结束时触发,通知Web应用程序任务已完成 - 我也在http GET中发送task_id.http GET方法调用django视图,它创建AsyncResult并获取结果,但问题是当调用result.get()时,我收到以下错误:

/usr/lib64/python2.6/site-packages/django_celery-2.5.1-py2.6.egg/djcelery/managers.py:178: TxIsolationWarning: Polling results with transaction isolation level repeatable-read within the same transaction may give outdated results. Be sure to commit the transaction for each poll iteration.
  "Polling results with transaction isolation level"
Run Code Online (Sandbox Code Playgroud)

有什么想法吗?我没有使用数据库,因为我正在使用带有AMQP的rabbitmq.

更新.

我非常想使用第三个选项,这似乎是最好的选择 - 对于小的和大的返回值.我的整个任务看起来像这样:

class MyTask(Task):
  def __call__(self, *args, **kwargs):
    return self.run(*args, **kwargs)

  def after_return(self, status, retval, task_id, args, kwargs, einfo):
    if self.webhost is not None:
      conn = httplib.HTTPConnection(self.webhost, self.webport)
      conn.request("HEAD", "/vuln/task/output/"+task_id)

  def run(self, args, webhost=None, webport=None):
    self.webhost = webhost
    self.webport = webport
    r = "This is a basic result string used for code clarity"
    return r
Run Code Online (Sandbox Code Playgroud)

所以我重写了after_return函数,它也应该释放对我的任务的锁定,因为任务的run()函数已经返回了一个值.在HEAD请求中,我基本上调用了一个django函数,它在task_id上调用AsyncResult,它应该提供任务的结果.在我的情况下,我已经将任意结果用于测试目的,因为它仅用于测试.

我想知道为什么上面的代码不起作用.我可以使用on_success,但我不认为它会有所作为 - 或者它会吗?

ast*_*vic 15

如果你看这里你会发现以下内容:

Django-celery使用MySQL来跟踪所有任务/结果,rabbit-mq基本上用作通信总线.

真正发生的是您ASyncResult在任务仍在运行时尝试获取工作者(该任务调用了对您的服务器的HTTP请求,并且由于它尚未返回,因此来自该工作者的数据库锁定会话仍处于活动状态并且结果行仍然被锁定).当Django尝试读取任务结果(其状态和运行函数的实际返回值)时,它会发现行被锁定并向您发出警告.

有几种方法可以解决这个问题:

  1. 设置另一个芹菜任务以获得结果并将其链接到您的处理任务.这样原始任务就会完成,释放对db的锁定,新的将获取它,在django中读取结果并做任何你需要它做的事情.在这上面查看芹菜文档.

  2. 根本不打扰,只需对Django进行POST,并将完整的处理结果作为有效负载附加,而不是尝试通过db获取它.

  3. 覆盖任务类中的on_success并将通知请求POST到Django,然后在db表上释放锁.

请注意,在返回run方法(可能是pickle)时,您需要存储整个处理结果(无论它有多大).你没有提到结果有多大,所以实际上只做上面的场景#2(这就是我要做的)可能是有意义的.或者我会选择#3.另外,不要忘记在任务中处理on_failure方法.