如何在fastapi(uvicorn服务器)中创建调度程序

aru*_*aru 1 python scheduler fastapi uvicorn

我正在开发一项 python 服务,其中我必须创建一个 FastAPI 休息端点,并且应该在后台运行 2 个调度程序任务。

我们如何在 FastAPI 中实现调度程序?如果不可行,实现这种要求的最佳方法是什么,创建两个服务是正确的方法?

谢谢和问候,阿鲁

小智 5

我已经度过了困难时期,直到人工智能找到了对我有用的解决方案。\n您可以使用apscheduler

\n

让我向您展示一个简单的例子来说明它是如何工作的。

\n

在您的main.py或用于启动应用程序的其他模块中:

\n
from fastapi import FastAPI\n# Bellow the import create a job that will be executed on background\nfrom apscheduler.schedulers.background import BackgroundScheduler\n\n\napp = FastAPI()\napp.add_middleware(\n    CORSMiddleware,\n    allow_origins=["*"],\n    allow_credentials=True,\n    allow_methods=["*"],\n    allow_headers=["*"],\n)\n\n\ntest_list = ["1"] * 10\n    \n# Here you create the function do you want to schedule\n     \ndef check_list_len():\n    global test_list\n    logger.info(f"check_list_len\xef\xbc\x9a{len(test_list)}")\n\n@app.on_event(\'startup\')\ndef init_data():\n    scheduler = BackgroundScheduler()\n    scheduler.add_job(check_list_len, \'cron\', second=\'*/5\')\n    scheduler.start()\n
Run Code Online (Sandbox Code Playgroud)\n

因此,装饰器@app.on_event(\'startup\')将运行包含计划的函数init_data。\n在此示例中,计划将check_list_len每 5 秒运行一次函数。

\n

要查看更多用法示例,您可以在文档中搜索。

\n

GitHub 页面中,有一些使用示例。

\n

希望这对您有帮助!

\n