如何在特定虚拟主机上处理 Celery 任务?

Cer*_*rin 5 python django celery

我有一个 Celery 任务,例如:

from celery.task import task
from django.conf import settings
from base.tasks import BaseTask

@task(name="throw_exception", base=BaseTask)
def print_value(*args, **kwargs):
    print('BROKER_URL:', settings.BROKER_URL)
Run Code Online (Sandbox Code Playgroud)

我正在我的 virtualenv 中运行一个 Celery 工作线程,如下所示:

celery worker -A myproject -l info
Run Code Online (Sandbox Code Playgroud)

工人显示:

Connected to amqp://guest:**@127.0.0.1:5672/myapp
Run Code Online (Sandbox Code Playgroud)

当我从 Django shell 启动任务时:

>>> from django.conf import settings
>>> settings.BROKER_URL
'amqp://guest:**@127.0.0.1:5672/myapp'
>>> from myapp.tasks import print_value
>>> print_value.delay()
Run Code Online (Sandbox Code Playgroud)

我从未在工作人员日志中看到执行的任务。

但是,如果我将工作线程更改为使用带有默认“/”虚拟主机的 BROKER_URL,那么它会立即执行所有待处理的任务,这意味着print_value.delay()即使设置了正确的 BROKER_URL,我的所有调用也会将其发送到错误的虚拟主机。我究竟做错了什么?

编辑:问题似乎是 Celery 没有一致的 @task 装饰器,并且通过使用错误的装饰器,您会断开任务与代理设置的连接。因此,本质上,我的所有任务都配置为使用默认代理,而不是我的设置中定义的代理。旧文档说要使用,from celery.task import task但新文档...并没有真正指定,并且似乎暗示您应该使用文件app中定义的实例celery.py,例如@app.task. 问题是我的所有任务都在单独的tasks.py文件中,他们无法访问app实例。如果我将一个任务复制到我的任务中celery.py并使用@app.task装饰器,那么它会使用正确的虚拟主机并按预期工作,但显然,这不是一个实际的解决方案,因为我必须将数十个函数复制到此文件中。我该如何正确解决这个问题?

Ykh*_*Ykh 0

只需提供一个使用 djcelery 与不同 vhost 进行测试的演示。

在您的项目文件夹中__init__.py

from __future__ import absolute_import
# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app
Run Code Online (Sandbox Code Playgroud)

celery.py,替换SchoolMS为您自己的项目标签:

from __future__ import absolute_import
import os
from celery import Celery, platforms
from django.conf import settings

# set the default Django settings module for the 'celery' program.  
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'SchoolMS.settings')

app = Celery('SchoolMS')
platforms.C_FORCE_ROOT = True

# Using a string here means the worker will not have to  
# pickle the object when using Windows.  
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
Run Code Online (Sandbox Code Playgroud)

settings.py:

BROKER_URL = 'amqp://schoolms:schoolms@localhost:5672/schoolms'
CELERY_TIMEZONE = 'Asia/Shanghai'
CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler'
CELERYBEAT_SCHEDULE = {
}
Run Code Online (Sandbox Code Playgroud)

user/tasks.py:

from celery import task

from django.conf import settings


@task
def send_tel_verify(tel_verify_id):
    try:
        tel_verify = TelVerify.objects.get(id=tel_verify_id)
        try:
            send_sms(tel_verify.tel, 'xxxx')
            return ''success'
        except SmsError as e:
            return 'error'
    except ObjectDoesNotExist:
        return 'not found'
Run Code Online (Sandbox Code Playgroud)

user/views.py

send_tel_verify.delay(tel_verify.id)
Run Code Online (Sandbox Code Playgroud)