在Flask-Celery中收到类型为""的未注册任务

the*_*ner 2 python celery flask

我正在尝试将芹菜整合到我的应用程序中,但我收到此错误说Received unregistered task of type "". The message has been ignored and discarded.我的Celery应用程序实例正在创建如下:

from celery import Celery


def make_celery(app):
    celery = Celery(app.import_name, broker=app.config['CELERY_BROKER_URL'])
    celery.conf.update(app.config)
    TaskBase = celery.Task

    class ContextTask(TaskBase):
        abstract = True

        def __call__(self, *args, **kwargs):
            with app.app_context():
                return TaskBase.__call__(self, *args, **kwargs)
    celery.Task = ContextTask
    return celery
Run Code Online (Sandbox Code Playgroud)

我的任务文件是这样的:

from flask import current_app
from .. import celery

from ..models.models import MobileRedemption


@celery.task(name='process_new_redemption')
def task_process_new_redemption(red_id):
    redemption = MobileRedemption.objects(id=red_id).first()

    if redemption:
        assert isinstance(redemption, MobileRedemption)
    print ("Redemption Successful.....!")


@celery.task(name='process_delete_redemption')
def task_delete_redemption(red_id):
    current_app.logger.info("reached here!")
    redemption = MobileRedemption.objects(id=red_id).first()
    print(redemption)
    redemption.delete()
Run Code Online (Sandbox Code Playgroud)

我究竟做错了什么?

Ita*_*ayB 6

在你的Celery构造中你应该include你的任务文件:

celery = Celery(app.import_name, 
                broker=app.config['CELERY_BROKER_URL'],
                include=['path.to.tasks'])
Run Code Online (Sandbox Code Playgroud)