我有
CELERY_CREATE_MISSING_QUEUES = TrueCELERY_QUEUESCELERY_DEFAULT_QUEUE = 'default' (直接类型)我看到自定义路由器返回的路由中的新队列被创建,我假设是因为CELERY_CREATE_MISSING_QUEUES.
现在在我运行的工作节点中,我没有传递-Q参数,它只从'default'队列消耗,这似乎与文档一致 -
默认情况下,它将使用CELERY_QUEUES设置中定义的所有队列(如果未指定,则默认为名为celery的队列).
有没有办法让我的工作节点从所有队列中消耗,包括动态创建的队列?
谢谢,
工作人员需要被告知这些自动或动态创建的队列,因此您需要一种方法来获取这些队列名称并在创建它们时存储它们,或者rabbitmqctl list_queues如果您使用RabbitMQ作为代理来获取它们,例如添加一个信号处理程序,将这些动态队列添加到要使用的worker中.
例如使用celeryd_after_setup信号:
from celery.signals import celeryd_after_setup
@celeryd_after_setup.connect
def add_dynamic_queue(sender, instance, **kwargs):
# get the dynamic queue, maybe stored somewhere
queue = 'dynamic_queue'
instance.app.amqp.queues.select_add(queue)
Run Code Online (Sandbox Code Playgroud)
如果始终创建新的动态队列,还可以命令worker在运行时从这些队列开始使用:
#command all workers to consume from the 'dynamic_queue' queue
app.control.add_consumer('dynamic_queue', reply=True)
# command specific workers
app.control.add_consumer('dynamic_queue', reply=True, destination=[w1@example])
Run Code Online (Sandbox Code Playgroud)
请参阅添加消费者.
我希望这有帮助,当我得到更多关于此的信息时,我会编辑问题.
| 归档时间: |
|
| 查看次数: |
2820 次 |
| 最近记录: |