如何在 Celery 中设置每个任务(或队列)的并发限制?

rzl*_*vmp 12 python concurrency celery

我的示例代码是:

\n
    \n
  • 运行.py
  • \n
\n
from celery import Celery, Task\n\napp = Celery(\n    "app",\n    task_serializer=\'json\',\n    accept_content=[\'json\'],\n    result_serializer=\'json\',\n    timezone=\'Asia/Tokyo\',\n    enable_utc=True,\n    backend=\'redis://127.0.0.1:6369/2\',\n    broker=\'redis://127.0.0.1:6369/3\',\n    include=[\'app.tasks\']\n)\n\napp.conf.task_routes = {\'app.tasks.long_run\': {\'queue\': \'long_running_task\'}}\n\n\nclass VerboseTask(Task):\n\n    def on_failure(self, exc, task_id, args, kwargs, einfo):\n        print("-------------------------------------------")\n        print(\n            f"FAILURE({task_id})\xef\xbc\x9a"\n            f"{self.request.task}{kwargs}, Exception={exc}, ErrorInfo={einfo}"\n        )\n        print("-------------------------------------------")\n\n    def on_success(self, retval, task_id, args, kwargs):\n        print("-------------------------------------------")\n        print(f"SUCCESS({task_id})\xef\xbc\x9a{self.request.task}{kwargs}")\n        print("-------------------------------------------")\n\n    def on_retry(self, exc, task_id, args, kwargs, einfo):\n        print("-------------------------------------------")\n        print(f"RETRY({task_id})\xef\xbc\x9a{self.request.task}{kwargs}")\n        print("-------------------------------------------")\n\n
Run Code Online (Sandbox Code Playgroud)\n
    \n
  • 应用程序/任务.py
  • \n
\n
from run import app, VerboseTask\nfrom time import sleep\n\n\nclass TSK:\n\n    name = \'tsk\'\n\n    @app.task(bind=True, base=VerboseTask)\n    def short_run(self, **kwargs):\n        for i in range(0, 3):\n            print(f\'{self.request.task} [{self.request.id[0:5]}] \xe2\x86\x92 {i*5} ~ {(i+1)*5}\')\n            sleep(5)\n        print(\'short_run finished\')\n\n    @app.task(bind=True, base=VerboseTask)\n    def long_run(self, **kwargs):\n        for i in range(0, 10):\n            print(f\'{self.request.task} [{self.request.id[0:5]}] \xe2\x86\x92 {i*5} ~ {(i+1)*5}\')\n            sleep(5)\n        print(\'long_run finished\')\n\n
Run Code Online (Sandbox Code Playgroud)\n

我用芹菜

\n
celery -A run worker -l info --concurrency=4 -Q long_running_task,celery\n
Run Code Online (Sandbox Code Playgroud)\n

问题是当我调用long_running_task四次然后执行常规celery任务时,会long_running_task消耗所有工作人员并且celery任务需要等待四个长时间运行的任务。

\n

我想保留至少 2 个工作人员来执行短期任务。例如,如果当前任务是两个短期运行任务和一个长期运行任务,则只有一个工作线程可用,并且只有短期运行任务可以运行。长时间运行的任务必须等待,而三个工作人员将同时空闲以消耗其中之一。

\n

换句话说,限制长时间运行任务的最大并发度也是2一种可能的解决方案。

\n

我知道我可能会运行两个具有独立并发性的 Celery 实例:

\n
celery -A run worker -l info --concurrency=2 -n worker1@%h -Q long_running_task\ncelery -A run worker -l info --concurrency=2 -n worker2@%h -Q celery\n
Run Code Online (Sandbox Code Playgroud)\n

\n
    \n
  1. 如果空闲的话,我也想消耗long_running_task工作人员来完成其他任务(这不是那么重要的条件,可能会被忽略)
  2. \n
  3. 我使用 docker 容器,并且只想在其中保留一个ENTRYPOINT(换句话说,我不想通过运行单独的命令来分隔工作人员。我只想执行celery -A run worker ...一次)
  4. \n
\n

有什么方法可以为满足我的条件的工作人员定制限制和并发数吗?

\n

MrE*_*MrE 0

您可以让一个工作人员从多个队列中消费。如果您有一个 high_priority 队列和一个 long_running 队列,则可以为 high_priority 队列设置一个专用工作线程,以确保始终有一个工作线程准备好接收这些任务,然后让其他工作线程同时消耗 high_priority 和 long_running 任务。

就入口点而言,只需为入口点使用 shell 脚本,它接受一个参数(要从中使用的队列列表),因此您只有 1 个 docker 映像,但有 2 个部署:1 个用于 high_priority 队列,1 个用于两个队列,以及您可以根据负载单独缩放每个。