小编tar*_*ueh的帖子

如何在运行时创建celery队列,以便发送到该队列的任务被工作人员接收?

我正在使用django 1.4,芹菜3.0,rabbitmq

为了描述这个问题,我在系统中有很多内容网络,我想要一个队列来处理与这些网络相关的任务.

但是,当系统处于活动状态时,内容会立即创建,因此我需要动态创建队列并让现有工作人员开始接收它们.

我尝试以下列方式安排任务(其中内容是django模型实例):

queue_name = 'content.{}'.format(content.pk)
# E.g. queue_name = content.0c3a92a4-3472-47b8-8258-2d6c8a71e3ba
add_content.apply_async(args=[content], queue=queue_name)
Run Code Online (Sandbox Code Playgroud)

这将创建一个带有名称的队列,content.0c3a92a4-3472-47b8-8258-2d6c8a71e3ba使用名称content.0c3a92a4-3472-47b8-8258-2d6c8a71e3ba和路由键创建一个新的交换,content.0c3a92a4-3472-47b8-8258-2d6c8a71e3ba并将任务发送到该队列.

但是,我从未看到工人们接受这些任务.我当前设置的工作人员没有侦听任何特定的队列(没有使用队列名称初始化),而是接收发送到默认队列的任务.我的芹菜设置是:

BROKER_URL = "amqp://test:password@localhost:5672/vhost"
CELERY_TIMEZONE = 'UTC'
CELERY_ALWAYS_EAGER = False

from kombu import Exchange, Queue

CELERY_DEFAULT_QUEUE = 'default'
CELERY_DEFAULT_EXCHANGE = 'default'
CELERY_DEFAULT_EXCHANGE_TYPE = 'direct'
CELERY_DEFAULT_ROUTING_KEY = 'default'

CELERY_QUEUES = (
    Queue(CELERY_DEFAULT_QUEUE, Exchange(CELERY_DEFAULT_EXCHANGE),
        routing_key=CELERY_DEFAULT_ROUTING_KEY),
)

CELERY_CREATE_MISSING_QUEUES = True
CELERYD_PREFETCH_MULTIPLIER = 1
Run Code Online (Sandbox Code Playgroud)

知道如何让工作人员接收发送到这个新创建的队列的任务吗?

django queue worker amqp celery

2
推荐指数
1
解决办法
5515
查看次数

标签 统计

amqp ×1

celery ×1

django ×1

queue ×1

worker ×1