Celery Worker 在当前任务完成后不会再接受新任务

Mat*_*t H 3 python django rabbitmq celery

我有三项任务:

@app.task(name='timey')
def timey():
    print "timey"
    while True:
        pass
    return 1

@app.task(name='endtimey')
def endtimey():
    for i in range(10):
        print "ENDTIMEY", time()
        sleep(3)
    return 1

@app.task(name='nexttask')
def nexttask(n):
    print "NEXT TASK"
    return 1
Run Code Online (Sandbox Code Playgroud)

如果我唯一做的就是将 endtimey 和 nexttask 链接在一起 -

chain(endtimey.s() | nexttask.s()).delay()

一切都按预期进行。我在芹菜日志中看到ENDTIMEY <current time>打印十次。NEXT TASK但是,如果我用无限任务填充 7 个工人timey,然后将其链接endtimey在一起nexttask-

for i in range(7):
    timey.s().delay()
chain(endtimey.s() | nexttask.s()).delay()
Run Code Online (Sandbox Code Playgroud)

所有timey任务将由 8 个工作线程中的 7 个接收,并endtimey在第 8 个工作线程上运行,之后日志将显示nexttask已收到,但nexttask不会运行。

为什么是这样?

另外,如果我杀死芹菜服务器然后重新启动它,这nexttask将是第一个运行的事情。

这是一个人为的示例,但我在更复杂的情况下遇到了一个问题,即芹菜工作人员在完成当前任务后不会拾取排队的任务。如果我在这种情况下重新启动 celery,自由工作人员将再次开始执行任务。

小智 6

听起来问题是 celery 的默认预取行为。每个工作人员将在当前达到最大容量时提前预留一定数量的任务,这称为预取乘数

这样做的原因是,当您有大量短任务时,如果任务已经预取并准备好立即执行,那么您的总体吞吐量将会高得多。

问题是,当您有大量长时间运行的任务或长短任务混合时,即使其他工作人员可以处理它,任务也可能会被繁忙的工作人员保留和阻塞。

因此,根据您的情况,您可能需要将预取乘数降低到 1。