在Celery中,如何阻止长时间延迟的任务阻止新的任务?

use*_*316 11 python scheduling rabbitmq celery

我有两种任务.任务A由celerybeat每小时生成一次.它立即运行,并生成任务B的一千(或几千)个实例,每个实例都有一天的ETA.

启动时,任务A的实例运行并生成一千个B.从那以后,没有任何事情发生.我应该看到另一个A每小时运行,还有另外一千个B.但实际上我什么也看不见.

在冻结时,rabbitmqctl显示1000条消息,968准备好,32未确认.一小时后,有1001条消息,969准备好,32未确认.等等,每隔一小时就有一条新消息被归类为准备就绪.据推测,正在发生的事情是,工人正在预取32条消息但不能对它们采取行动,因为他们的ETA仍然在未来.与此同时,现在应该运行的新任务无法运行.

处理这个问题的正确方法是什么?我猜我需要多个工作人员,也许还有多个队列(但我不确定后一点).有更简单的方法吗?我试过摆弄CELERYD_PREFETCH_MULTIPLIER和-Ofail(如这里讨论的那样:http://celery.readthedocs.org/en/latest/userguide/optimizing.html )但是无法让它去.我的问题和这个问题一样:[[Django Celery]] Celery阻止做IO任务

在任何情况下:我只能解决这个问题,因为我对任务的性质及其时间有很多了解.使用未来ETA的足够任务是否可以锁定整个系统似乎不是一个设计缺陷?如果我等待几个小时,然后杀死并重新启动工作程序,它会再次抓取前32个任务并冻结,即使此时队列中的任务已准备好立即运行.某些组件是否应该足够聪明才能查看ETA​​并忽略不可运行的任务?

附录:我现在认为当RabbitMQ 3.3与Celery 3.1.0一起使用时,问题是一个已知的错误.有关详细信息,请访问:https://groups.google.com/forum/#!searchin/celery-users/countdown | sort: date/accelery-users/FiAAESOzezA/499OH-pylacJ

更新到Celery 3.1.1后,事情似乎更好.任务A每小时运行一次(好吧,它有几个小时)并安排任务B的副本.这些似乎正在填补工作人员:未确认消息的数量继续增长.我必须看看它能不受限制地增长.

Kyl*_*ens 2

看来这是一个可以通过路由解决的问题: http://celery.readthedocs.org/en/latest/userguide/routing.html

使用路由时,您可以拥有多个填充不同类型任务的队列。如果您希望任务 B 不阻塞更多任务 A,您可以将它们放入具有不同优先级的单独工作队列中,这样您的工作人员将在充满任务 B 的大队列上工作,但当任务 A 到达时,它会被下一个可用的任务拉取工人。

这样做的额外好处是,您还可以将更多工作人员分配给严重填充的队列,并且这些工作人员只会从指定的高容量队列中拉出。