Celery 抛出 BacklogLimitExceeded

kvi*_*tor 5 python celery python-3.x

我开始一个任务,它会定期更新它的状态并观察结果,但是在调用方的第二个周期之后,程序抛出一个 BacklogLimitExceeded 异常(任务本身在一段时间后成功完成)

呼叫方:

      task = signature("worker.taskname", args=(url, ), queue="worker")
      g = group(task).apply_async() 
      while not g.ready():
          print(g[0].result)
          time.sleep(5)
Run Code Online (Sandbox Code Playgroud)

任务方:

 with open(filename, "wb") as w:
     fd = stream.open()
     while True:
         data = fd.read(2048)
         if data:
             w.write(data)
             size = w.tell()
             # taskname.update_state(meta={'size': size})
         else:
             break
Run Code Online (Sandbox Code Playgroud)

(如果我评论那行,一切正常)

我在 Ubuntu 14.04 上使用 RabbitMQ 作为代理和后端。知道如何解决这个问题吗?

这是确切的轨迹

Traceback (most recent call last):
  File "main.py", line 55, in <module>
    while not g.ready():
  File "python3.4/site-packages/celery/result.py", line 503, in ready
    return all(result.ready() for result in self.results)
  File "python3.4/site-packages/celery/result.py", line 503, in <genexpr>
    return all(result.ready() for result in self.results)
  File "python3.4/site-packages/celery/result.py", line 259, in ready
    return self.state in self.backend.READY_STATES
  File "python3.4/site-packages/celery/result.py", line 394, in state
    return self._get_task_meta()['status']
  File "python3.4/site-packages/celery/result.py", line 339, in _get_task_meta
    return self._maybe_set_cache(self.backend.get_task_meta(self.id))
  File "python3.4/site-packages/celery/backends/amqp.py", line 180, in get_task_meta
    raise self.BacklogLimitExceeded(task_id)
celery.backends.amqp.BacklogLimitExceeded: 0a4fb653-0f05-48dc-ac43-fb0c8fbaba9a
Run Code Online (Sandbox Code Playgroud)

小智 2

我最近在使用 Redis 作为后端时收到了这个错误,并进一步研究了它。该错误是由于后端有超过 1000 条消息造成的,当循环达到此默认限制时,您会收到此错误。

有一些旋钮可能会有所帮助, result_expires 就是其中之一。您还可以将限制提高到 1000 以上。

http://docs.celeryproject.org/en/latest/userguide/configuration.html#redis-backend-settings