芹菜工人:如何从所有队列消费?

ksr*_*ini 6 python celery

我有

  • CELERY_CREATE_MISSING_QUEUES = True
  • 没有定义的 CELERY_QUEUES
  • 定义CELERY_DEFAULT_QUEUE = 'default' (直接类型)
  • 一个自定义路由器类,可以动态创建路由,如此票证所示(https://github.com/celery/celery/issues/150).

我看到自定义路由器返回的路由中的新队列被创建,我假设是因为CELERY_CREATE_MISSING_QUEUES.

现在在我运行的工作节点中,我没有传递-Q参数,它只从'default'队列消耗,这似乎与文档一致 -

默认情况下,它将使用CELERY_QUEUES设置中定义的所有队列(如果未指定,则默认为名为celery的队列).

有没有办法让我的工作节点从所有队列中消耗,包括动态创建的队列?

谢谢,

Pie*_*rre 5

工作人员需要被告知这些自动或动态创建的队列,因此您需要一种方法来获取这些队列名称并在创建它们时存储它们,或者rabbitmqctl list_queues如果您使用RabbitMQ作为代理来获取它们,例如添加一个信号处理程序,将这些动态队列添加到要使用的worker中.

例如使用celeryd_after_setup信号:

from celery.signals import celeryd_after_setup

@celeryd_after_setup.connect
def add_dynamic_queue(sender, instance, **kwargs):
    # get the dynamic queue, maybe stored somewhere
    queue = 'dynamic_queue'
    instance.app.amqp.queues.select_add(queue)
Run Code Online (Sandbox Code Playgroud)

如果始终创建新的动态队列,还可以命令worker在运行时从这些队列开始使用:

#command all workers to consume from the 'dynamic_queue' queue
app.control.add_consumer('dynamic_queue', reply=True)

# command specific workers
app.control.add_consumer('dynamic_queue', reply=True, destination=[w1@example])
Run Code Online (Sandbox Code Playgroud)

请参阅添加消费者.

我希望这有帮助,当我得到更多关于此的信息时,我会编辑问题.