the*_*ner 2 python celery flask
我正在尝试将芹菜整合到我的应用程序中,但我收到此错误说Received unregistered task of type "". The message has been ignored and discarded.我的Celery应用程序实例正在创建如下:
from celery import Celery
def make_celery(app):
celery = Celery(app.import_name, broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)
TaskBase = celery.Task
class ContextTask(TaskBase):
abstract = True
def __call__(self, *args, **kwargs):
with app.app_context():
return TaskBase.__call__(self, *args, **kwargs)
celery.Task = ContextTask
return celery
Run Code Online (Sandbox Code Playgroud)
我的任务文件是这样的:
from flask import current_app
from .. import celery
from ..models.models import MobileRedemption
@celery.task(name='process_new_redemption')
def task_process_new_redemption(red_id):
redemption = MobileRedemption.objects(id=red_id).first()
if redemption:
assert isinstance(redemption, MobileRedemption)
print ("Redemption Successful.....!")
@celery.task(name='process_delete_redemption')
def task_delete_redemption(red_id):
current_app.logger.info("reached here!")
redemption = MobileRedemption.objects(id=red_id).first()
print(redemption)
redemption.delete()
Run Code Online (Sandbox Code Playgroud)
我究竟做错了什么?
在你的Celery构造中你应该include你的任务文件:
celery = Celery(app.import_name,
broker=app.config['CELERY_BROKER_URL'],
include=['path.to.tasks'])
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
725 次 |
| 最近记录: |