Arz*_*dız 7 python schedule fastapi
我想使用 Python FASTAPI 编写一个每天下午 3:30 只运行一次的任务。我该怎么做?
我尝试过这个,但它一直有效。
schedule.every().day.at("15:30:00").do(job2)
while True:
schedule.run_all()
Run Code Online (Sandbox Code Playgroud)
mik*_*sus 12
以下是如何使用Rocketry来做到这一点。Rocketry 是一个基于语句的调度器,它与 FastAPI 集成良好。
假设您有一个scheduler.py. 这是您放置任务的地方。该文件的内容:
from rocketry import Rocketry
from rocketry.conds import daily
app = Rocketry()
# Create some tasks:
@app.task(daily.after("15:30"))
def do_things():
...
Run Code Online (Sandbox Code Playgroud)
然后是 FastAPI 应用程序,我们称之为api.py:
from fastapi import FastAPI
from scheduler import app as app_rocketry
app = FastAPI()
session = app_rocketry.session
# Create some routes:
@app.get("/my-route")
async def get_tasks():
# We can modify/read the Rocketry's runtime session
return session.tasks
@app.post("/my-route")
async def manipulate_session():
for task in session.tasks:
...
Run Code Online (Sandbox Code Playgroud)
然后将这main.py两者结合起来并运行它们:
import asyncio
import uvicorn
from api import app as app_fastapi
from scheduler import app as app_rocketry
class Server(uvicorn.Server):
"""Customized uvicorn.Server
Uvicorn server overrides signals and we need to include
Rocketry to the signals."""
def handle_exit(self, sig: int, frame) -> None:
app_rocketry.session.shut_down()
return super().handle_exit(sig, frame)
async def main():
"Run scheduler and the API"
server = Server(config=uvicorn.Config(app_fastapi, workers=1, loop="asyncio"))
api = asyncio.create_task(server.serve())
sched = asyncio.create_task(app_rocketry.serve())
await asyncio.wait([sched, api])
if __name__ == "__main__":
asyncio.run(main())
Run Code Online (Sandbox Code Playgroud)
这两个应用程序可以无缝地相互通信。我还制作了一个模板,您可以克隆:https ://github.com/Miksus/rocketry-with-fastapi
相关链接:
小智 1
交换schedule.run_all()为schedule.run_pending(). 它应该有效!
import schedule
import time
def job():
print("I'm working...")
schedule.every().day.at("15:30:00").do(job)
while True:
schedule.run_pending()
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
17751 次 |
| 最近记录: |