我对此做了很多研究,包括尝试这样的答案。看来 Celery 无法访问我的 Flask 应用程序的上下文。
我完全了解我的 celery 对象,它将装饰我的任务,必须能够访问我的 Flask 应用程序的上下文。我确实相信它应该,因为我按照本指南创建了我的芹菜对象。我不确定混淆是否存在于我使用 Flask-HTTPAuth 的事实中。
这是我所拥有的一些内容。
def make_celery(app):
celery = Celery(app.import_name, backend=app.config["CELERY_RESULT_BACKEND"], broker=app.config["CELERY_BROKER_URL"])
celery.conf.update(app.config)
TaskBase = celery.Task
class ContextTask(TaskBase):
abstract = True
def __call__(self, *args, **kwargs):
with app.app_context():
return TaskBase.__call__(self, *args, **kwargs)
celery.Task = ContextTask
return celery
app = Flask(__name__)
auth = HTTPBasicAuth()
app.config["SQLALCHEMY_DATABASE_URI"] = "sqlite:///flask_app.db"
app.config["CELERY_BROKER_URL"] = "redis://localhost:6379"
app.config["CELERY_RESULT_BACKEND"] = "redis://localhost:6379"
celery = make_celery(app)
db = SQLAlchemy(app)
@celery.task(bind=True, name="flask_app.item_loop")
def loop(self):
items = g.user.items
for item in items: …Run Code Online (Sandbox Code Playgroud)