python 3.10运行venv在Windows 10 pro.
我正在尝试按照教程进行Celery集成Flask:https://flask.palletsprojects.com/en/latest/patterns/celery/
# example.py
from celery import Celery, Task
from flask import Flask
def celery_init_app(app: Flask) -> Celery:
class FlaskTask(Task):
def __call__(self, *args: object, **kwargs: object) -> object:
with app.app_context():
return self.run(*args, **kwargs)
celery_app = Celery(app.name, task_cls=FlaskTask)
celery_app.config_from_object(app.config["CELERY"])
celery_app.set_default()
app.extensions["celery"] = celery_app
return celery_app
def create_app() -> Flask:
app = Flask(__name__)
app.config.from_mapping(
CELERY=dict(
# Redis Docker container connection string
broker_url="redis://default:redispw@localhost:55000",
result_backend="redis://default:redispw@localhost:55000",
task_ignore_result=True,
),
)
app.config.from_prefixed_env()
celery_init_app(app)
return …Run Code Online (Sandbox Code Playgroud)