芹菜任务路线未按预期工作

Dea*_*ada 6 python celery

我正在练习芹菜,我想将我的任务分配到特定的队列,但是无法正常工作

我的__init__.py

import os
import sys
from celery import Celery

CURRENT_DIR = os.path.dirname(os.path.abspath(__file__))

sys.path.append(CURRENT_DIR)

app = Celery()
app.config_from_object('celery_config')
Run Code Online (Sandbox Code Playgroud)

我的celery_config.py

amqp = 'amqp://guest:guest@localhost:5672//'
broker_url = amqp
result_backend = amqp

task_routes = ([
    ('import_feed', {'queue': 'queue_import_feed'})
])
Run Code Online (Sandbox Code Playgroud)

我的tasks.py

from . import app

@app.task(name='import_feed')
def import_feed():
    pass
Run Code Online (Sandbox Code Playgroud)

我如何管理工人:

celery -A subscriber1.tasks worker -l info
Run Code Online (Sandbox Code Playgroud)

我的客户的__init__.py

import os
import sys
from celery import Celery

CURRENT_DIR = os.path.dirname(os.path.abspath(__file__))

sys.path.append(CURRENT_DIR)

app = Celery()
app.config_from_object('celery_config')
Run Code Online (Sandbox Code Playgroud)

我的客户的celery_config.py

from kombu.common import Broadcast

amqp = 'amqp://guest:guest@localhost:5672//'
BROKER_URL = amqp
CELERY_RESULT_BACKEND = amqp
Run Code Online (Sandbox Code Playgroud)

然后在客户的外壳中尝试:

from publisher import app
result = app.send_task('import_feed')
Run Code Online (Sandbox Code Playgroud)

然后我的工人得到了任务?!我期望不会,因为我将其分配给了特定的队列。我在客户端尝试了以下命令,但我的工人没有收到任何任务,我希望它在第一个任务上已收到

result = app.send_task('import_feed', queue='queue_import_feed')
Run Code Online (Sandbox Code Playgroud)

好像我误解了路由部分中的某些内容。但是我真正想要的是import_feedqueue_import_feed在发送任务时指定了队列的情况下任务才能运行

Olu*_*ule 8

您可以更改工作程序处理的默认队列。

app.send_task('import_feed')将任务发送到celery队列。

app.send_task('import_feed', queue='queue_import_feed')将任务发送到queue_import_feed但您的工作人员仅处理celery队列中的任务。

要处理特定队列,请使用-Q开关

celery -A subscriber1.tasks worker -l info -Q 'queue_import_feed'
Run Code Online (Sandbox Code Playgroud)

编辑

为了将限制上send_task,使得工人对发生反应import_feed,只有当它与队列发布任务,你需要重写send_taskCelery,并且还提供了自定义的AMQP一个default_queue设置为None

反应器.py

from celery.app.amqp import AMQP
from celery import Celery

class MyCelery(Celery):
    def send_task(self, name=None, args=None, kwargs=None, **options):
        if 'queue' in options:
            return super(MyCelery, self).send_task(name, args, kwargs, **options)


class MyAMQP(AMQP):
    default_queue = None
Run Code Online (Sandbox Code Playgroud)

celery_config.py

from kombu import Exchange, Queue

...

task_exchange = Exchange('default', type='direct')
task_create_missing_queues = False

task_queues = [
    Queue('feed_queue', task_exchange, routing_key='feeds'),
]

task_routes = {
    'import_feed': {'queue': 'feed_queue', 'routing_key': 'feeds'}
}
Run Code Online (Sandbox Code Playgroud)

__init__.py

celeree = MyCelery(amqp='reactor.MyAMQP')
Run Code Online (Sandbox Code Playgroud)