Python FASTAPI调度任务

Arz*_*dız 7 python schedule fastapi

我想使用 Python FASTAPI 编写一个每天下午 3:30 只运行一次的任务。我该怎么做?

我尝试过这个,但它一直有效。

schedule.every().day.at("15:30:00").do(job2)

while True:
    schedule.run_all()
Run Code Online (Sandbox Code Playgroud)

mik*_*sus 12

以下是如何使用Rocketry来做到这一点。Rocketry 是一个基于语句的调度器,它与 FastAPI 集成良好。

假设您有一个scheduler.py. 这是您放置任务的地方。该文件的内容:

from rocketry import Rocketry
from rocketry.conds import daily

app = Rocketry()

# Create some tasks:

@app.task(daily.after("15:30"))
def do_things():
    ...
Run Code Online (Sandbox Code Playgroud)

然后是 FastAPI 应用程序,我们称之为api.py

from fastapi import FastAPI
from scheduler import app as app_rocketry

app = FastAPI()
session = app_rocketry.session

# Create some routes:

@app.get("/my-route")
async def get_tasks():
    # We can modify/read the Rocketry's runtime session
    return session.tasks

@app.post("/my-route")
async def manipulate_session():
    for task in session.tasks:
        ...
Run Code Online (Sandbox Code Playgroud)

然后将这main.py两者结合起来并运行它们:

import asyncio
import uvicorn

from api import app as app_fastapi
from scheduler import app as app_rocketry

class Server(uvicorn.Server):
    """Customized uvicorn.Server

    Uvicorn server overrides signals and we need to include
    Rocketry to the signals."""
    def handle_exit(self, sig: int, frame) -> None:
        app_rocketry.session.shut_down()
        return super().handle_exit(sig, frame)


async def main():
    "Run scheduler and the API"
    server = Server(config=uvicorn.Config(app_fastapi, workers=1, loop="asyncio"))

    api = asyncio.create_task(server.serve())
    sched = asyncio.create_task(app_rocketry.serve())

    await asyncio.wait([sched, api])

if __name__ == "__main__":
    asyncio.run(main())
Run Code Online (Sandbox Code Playgroud)

这两个应用程序可以无缝地相互通信。我还制作了一个模板,您可以克隆:https ://github.com/Miksus/rocketry-with-fastapi

相关链接:


小智 1

交换schedule.run_all()schedule.run_pending(). 它应该有效!

import schedule
import time

def job():
    print("I'm working...")

schedule.every().day.at("15:30:00").do(job)

while True:
    schedule.run_pending()
Run Code Online (Sandbox Code Playgroud)