如何告诉 celery 工人停止接受任务?如何检查是否有 celery 工作任务正在运行?

cod*_*ape 4 python linux upstart celery continuous-deployment

场景:

  • 系统在服务器上运行,由 Python/Flask Web 应用程序和使用 Celery 的后台任务组成
  • Web 应用程序和 celery 工作线程都作为新贵作业运行(Nginx 后面的 Web 应用程序)
  • 部署到生产是通过以下脚本完成的:

    • 停止新贵工作
    • 推送代码到服务器
    • 运行任何数据库迁移
    • 开始新贵工作

如何增强部署脚本以使其执行以下操作?:

  • 告诉celery工人停止接受任务
  • 等待当前正在运行的所有 celery 任务完成
  • 停止新贵工作
  • 推送代码到服务器
  • 运行任何数据库迁移
  • 开始新贵工作

cod*_*ape 6

作为部署的一部分运行的以下脚本解决了该问题:

import time
from celery.app.control import Control
from myapp.tasks import celery # my application's Celery app

if __name__ == "__main__":
    control = Control(celery)
    control.cancel_consumer("celery") # queue name, must probably be specified once per queue, but my app uses a single queue

    inspect = control.inspect()
    while True:
        active = inspect.active()
        running_jobs = []
        for key, value in active.items():
            running_jobs.extend(value)
        if len(running_jobs) > 0:
            print("{} jobs running: {}".format(len(running_jobs), ", ".join(job["name"] for job in running_jobs)))
            time.sleep(10)
        else:
            print("No running jobs")
            break
Run Code Online (Sandbox Code Playgroud)