芹菜任务被“接收”意味着什么?当所有 celery 工作线程都被阻塞时,未“收到”的新任务会发生什么情况?

Zac*_*ary 3 python celery flower

我正在开发一个新的监控系统,该系统可以测量 Celery 队列吞吐量,并在队列备份时帮助向团队发出警报。在我的工作过程中,我遇到了一些我不理解的奇怪行为(并且在 Celery 规范中没有详细记录)。

出于测试目的,我设置了一个端点,该端点将使用 16 个可用于模拟备份队列的长时间运行的任务填充队列。框架是Flask,队列代理是Redis。Celery 配置为每个工作人员可以并行处理最多 4 个任务,而我有 2 个工作人员正在运行。

api/health.py

def health():
    health = Blueprint("health", __name__)

    @health.route("/api/debug/create-long-queue", methods=["GET"])
    def long_queue():
        for i in range(16):
            sleepy_job.delay()

        return make_response({}, 200)

    return health
Run Code Online (Sandbox Code Playgroud)

工作.py

@celery.task(priority=HIGH_PRIORITY)
def sleepy_job(*args, **kwargs):
    time.sleep(30)
Run Code Online (Sandbox Code Playgroud)

以下是我模拟备份生产队列的方法:

  1. 我打电话/api/debug/create-long-queue来模拟队列中的备份。根据上面的计算,每个工人应该忙着睡觉 1 分钟(总共可以同时处理 8 个任务。每个任务只休眠 30 秒,总共 16 个任务。)
  2. 不久后(< 5 秒)我进行了另一个 API 调用,这会启动具有实际业务逻辑的不同工作(处理入站 Webhook API 调用)。我们称这个工作为handle_incoming_message

这是我看到的使用花来检查队列的内容:

  • 虽然所有工作线程都被前 8 个任务阻塞,但我在队列中看 sleepy_job不到新任务的迹象 ,尽管我确信第二个 API 调用已被调用。handle_incoming_messagehandle_incoming_message.delay()
  • 前 8 个sleepy_job任务完成后(约 30 秒),我 handle_incoming_message在队列中看到新任务,状态为RECIEVED
  • 在第二个(也是最后一个)8 个sleepy_job任务完成后,我现在看到handle_incoming_message了状态STARTED(当 UI 使用该任务中接收和处理的新数据进行更新时,我可以确认这一点。)

问题

因此,很明显,当工作人员在处理完前 8 个sleepy_job任务后暂时解除阻塞时,他们正在以一种可见的方式做一些事情来标记/确认新任务。但这留下了几个悬而未决的问题:handle_incoming_message

  • handle_incoming_message当worker被阻塞时,新任务的状态如何?
  • 工作人员解除封锁后发生了哪些变化,使得它现在可以了解新handle_incoming_message任务?
  • “已接收”状态实际上意味着什么?
  • (奖励:如何在工作人员被阻塞时了解排队的任务?)

Dej*_*kic 7

  1. 当所有工作人员都被阻止时,某些任务可能由于预取而处于已接收状态(请参阅文档)。因此,您的任务很可能只是在队列中,等待 Celery 工作进程(协调进程 - 这些不是实际的工作进程)接收。

  2. Flower 是一项简单的服务,它基于称为“任务事件”的 Celery 功能构建。简单来说,它(Flower)将自己订阅为所有事件(已接收、成功、启动、失败等)的接收者,然后以可视方式将这些事件呈现给 Web 客户端。更多相关信息请参见此处。因此,当 Celery 工作线程收到任务时,就会发送“任务已收到”事件。Flower 获取此事件,并更改仪表板中该任务的状态。

  3. 当一个任务被“接收”时,这意味着特定的 Celery 工作线程将该任务从队列中取出,并且它可以立即执行(如果有空闲的工作进程来执行它),或者 Celery 工作线程将等待工作进程成为准备运行任务。我已经提到过预取 - Celery 工作进程通常会执行比可用工作进程更多的任务。

  4. Celery 没有为用户提供列出特定队列中的内容的方法。这就是为什么您会看到许多类似的问题,包括这个提供答案的问题。您会在那里看到我的简短回答。简而言之,这取决于您选择的经纪人。如果是 Redis,那么您只需浏览对象列表即可。如果是 RabbitMQ 那么你可以使用他们的工具来检查队列。我认为不提供此信息的决定是好的,因为此信息永远不可靠。当您列出特定队列中的所有任务时,可能会有数千个新任务......