FastAPI python:如何在后台运行线程?

Gab*_*ies 17 python multithreading fastapi uvicorn

我正在使用 FastAPI 在 python 中创建一个服务器,并且我想要一个与我的 API 无关的函数,每 5 分钟在后台运行一次(例如从 API 检查内容并根据响应打印内容)

我尝试创建一个运行该函数的线程start_worker,但它不打印任何内容。

有谁知道该怎么做?

def start_worker():
    print('[main]: starting worker...')
    my_worker = worker.Worker()
    my_worker.working_loop() # this function prints "hello" every 5 seconds

if __name__ == '__main__':
    print('[main]: starting...')
    uvicorn.run(app, host="0.0.0.0", port=8000, reload=True)
    _worker_thread = Thread(target=start_worker, daemon=False)
    _worker_thread.start()
Run Code Online (Sandbox Code Playgroud)

Chr*_*ris 30

选项1

您应该在调用之前启动线程uvicorn.run,就像uvicorn.run阻塞线程一样。

import time
import threading
from fastapi import FastAPI
import uvicorn

app = FastAPI()
class BackgroundTasks(threading.Thread):
    def run(self,*args,**kwargs):
        while True:
            print('Hello')
            time.sleep(5)
  
if __name__ == '__main__':
    t = BackgroundTasks()
    t.start()
    uvicorn.run(app, host="0.0.0.0", port=8000)
Run Code Online (Sandbox Code Playgroud)

您还可以使用 FastAPI 的启动事件来启动线程,只要在应用程序启动之前运行就可以。

@app.on_event("startup")
async def startup_event():
    t = BackgroundTasks()
    t.start()
Run Code Online (Sandbox Code Playgroud)

选项2

您可以使用重复事件调度程序来执行后台任务,如下所示:

import sched, time
from threading import Thread
from fastapi import FastAPI
import uvicorn

app = FastAPI()
s = sched.scheduler(time.time, time.sleep)

def print_event(sc): 
    print("Hello")
    sc.enter(5, 1, print_event, (sc,))

def start_scheduler():
    s.enter(5, 1, print_event, (s,))
    s.run()

@app.on_event("startup")
async def startup_event():
    thread = Thread(target = start_scheduler)
    thread.start()

if __name__ == '__main__':
    uvicorn.run(app, host="0.0.0.0", port=8000)
Run Code Online (Sandbox Code Playgroud)

选项3

如果您的任务是一个async def函数(有关FastAPI 中端点/后台任务的更多详细信息,请参阅此答案),那么您可以使用该函数将该任务添加到当前事件循环中。该函数接受一个协程对象(即函数)并返回一个对象(如果需要,可以用于任务,或者it 等)。该调用在当前线程的事件循环内创建任务,并在“后台”与事件循环中的所有其他任务同时执行,并在它们之间进行切换。defasync defasyncio.create_task()create_task()async defTaskawaitcancelawait

需要在调用 之前创建一个事件循环create_task(),并且在uvicorn以编程方式(例如使用uvicorn.run(app))或在终端中(例如使用uvicorn app:app)启动服务器时已经创建了事件循环。除了使用之外asyncio.create_task(),还可以使用asyncio.get_running_loop()来获取当前事件loop,然后调用loop.create_task()

下面的示例使用最近记录的方式来添加lifespan事件(使用上下文管理器),即应在应用程序启动之前以及应用程序关闭时执行的代码(请参阅文档以及答案以及此答案以获取更多详细信息和示例)。人们仍然可以使用startupshutdown事件,如前面的选项所示;但是,这些事件处理程序可能会从未来的 FastAPI/Starlette 版本中删除。

import time
import threading
from fastapi import FastAPI
import uvicorn

app = FastAPI()
class BackgroundTasks(threading.Thread):
    def run(self,*args,**kwargs):
        while True:
            print('Hello')
            time.sleep(5)
  
if __name__ == '__main__':
    t = BackgroundTasks()
    t.start()
    uvicorn.run(app, host="0.0.0.0", port=8000)
Run Code Online (Sandbox Code Playgroud)

  • 子流程的弹性如何?BackgroundTasks 服务是否有可能被终止,如果是的话,FastAPI 的事件处理程序是否会自动重新启动它? (3认同)