J. *_*Doe 5 python celery celery-task
我有一些代码将大量(1000s)芹菜任务排队,例如,假设是这样的:
for x in xrange(2000):
example_task.delay(x)
Run Code Online (Sandbox Code Playgroud)
有没有更好/更有效的方法来一次排队大量任务?他们都有不同的论点。
调用大量任务对您的 celery 工人来说是不健康的。此外,如果您正在考虑收集调用任务的结果,那么您的代码将不是最佳的。
您可以按特定大小批量处理您的任务。考虑下面链接中提到的例子。
http://docs.celeryproject.org/en/latest/userguide/canvas.html#chunks
当我们想使用 Celery 处理数百万个 PDF 时,我们也遇到了这个问题。我们的解决方案是编写一些我们称之为CeleryThrottle. 基本上,您可以使用所需的 Celery 队列和您想要的任务数量来配置节流阀,然后在循环中创建任务。当您创建任务时,限制器将监视实际队列的长度。如果它耗尽得太快,它会在一段时间内加速循环,以便将更多任务添加到队列中。如果队列变得太大,它会减慢循环速度并让某些任务完成。
这是代码:
\n\nclass CeleryThrottle(object):\n """A class for throttling celery."""\n\n def __init__(self, min_items=100, queue_name=\'celery\'):\n """Create a throttle to prevent celery run aways.\n\n :param min_items: The minimum number of items that should be enqueued. \n A maximum of 2\xc3\x97 this number may be created. This minimum value is not \n guaranteed and so a number slightly higher than your max concurrency \n should be used. Note that this number includes all tasks unless you use\n a specific queue for your processing.\n """\n self.min = min_items\n self.max = self.min * 2\n\n # Variables used to track the queue and wait-rate\n self.last_processed_count = 0\n self.count_to_do = self.max\n self.last_measurement = None\n self.first_run = True\n\n # Use a fixed-length queue to hold last N rates\n self.rates = deque(maxlen=15)\n self.avg_rate = self._calculate_avg()\n\n # For inspections\n self.queue_name = queue_name\n\n def _calculate_avg(self):\n return float(sum(self.rates)) / (len(self.rates) or 1)\n\n def _add_latest_rate(self):\n """Calculate the rate that the queue is processing items."""\n right_now = now()\n elapsed_seconds = (right_now - self.last_measurement).total_seconds()\n self.rates.append(self.last_processed_count / elapsed_seconds)\n self.last_measurement = right_now\n self.last_processed_count = 0\n self.avg_rate = self._calculate_avg()\n\n def maybe_wait(self):\n """Stall the calling function or let it proceed, depending on the queue.\n\n The idea here is to check the length of the queue as infrequently as \n possible while keeping the number of items in the queue as closely \n between self.min and self.max as possible.\n\n We do this by immediately enqueueing self.max items. After that, we \n monitor the queue to determine how quickly it is processing items. Using \n that rate we wait an appropriate amount of time or immediately press on.\n """\n self.last_processed_count += 1\n if self.count_to_do > 0:\n # Do not wait. Allow process to continue.\n if self.first_run:\n self.first_run = False\n self.last_measurement = now()\n self.count_to_do -= 1\n return\n\n self._add_latest_rate()\n task_count = get_queue_length(self.queue_name)\n if task_count > self.min:\n # Estimate how long the surplus will take to complete and wait that\n # long + 5% to ensure we\'re below self.min on next iteration.\n surplus_task_count = task_count - self.min\n wait_time = (surplus_task_count / self.avg_rate) * 1.05\n time.sleep(wait_time)\n\n # Assume we\'re below self.min due to waiting; max out the queue.\n if task_count < self.max:\n self.count_to_do = self.max - self.min\n return\n\n elif task_count <= self.min:\n # Add more items.\n self.count_to_do = self.max - task_count\n return\nRun Code Online (Sandbox Code Playgroud)\n\n我们这样使用它:
\n\nthrottle = CeleryThrottle(min_items=30, queue_name=queue)\nfor item in items:\n throttle.maybe_wait()\n do_something.delay()\nRun Code Online (Sandbox Code Playgroud)\n\n所以它使用起来非常简单,而且它很好地保持了队列在一个合适的位置\xe2\x80\x94,不太长,也不太短。它保持队列耗尽速率的滚动平均值,并且可以相应地调整自己的计时器。
\n| 归档时间: |
|
| 查看次数: |
4398 次 |
| 最近记录: |