Gab*_*ies 17 python multithreading fastapi uvicorn
我正在使用 FastAPI 在 python 中创建一个服务器,并且我想要一个与我的 API 无关的函数,每 5 分钟在后台运行一次(例如从 API 检查内容并根据响应打印内容)
我尝试创建一个运行该函数的线程start_worker,但它不打印任何内容。
有谁知道该怎么做?
def start_worker():
print('[main]: starting worker...')
my_worker = worker.Worker()
my_worker.working_loop() # this function prints "hello" every 5 seconds
if __name__ == '__main__':
print('[main]: starting...')
uvicorn.run(app, host="0.0.0.0", port=8000, reload=True)
_worker_thread = Thread(target=start_worker, daemon=False)
_worker_thread.start()
Run Code Online (Sandbox Code Playgroud)
Chr*_*ris 30
您应该在调用之前启动线程uvicorn.run,就像uvicorn.run阻塞线程一样。
import time
import threading
from fastapi import FastAPI
import uvicorn
app = FastAPI()
class BackgroundTasks(threading.Thread):
def run(self,*args,**kwargs):
while True:
print('Hello')
time.sleep(5)
if __name__ == '__main__':
t = BackgroundTasks()
t.start()
uvicorn.run(app, host="0.0.0.0", port=8000)
Run Code Online (Sandbox Code Playgroud)
您还可以使用 FastAPI 的启动事件来启动线程,只要在应用程序启动之前运行就可以。
@app.on_event("startup")
async def startup_event():
t = BackgroundTasks()
t.start()
Run Code Online (Sandbox Code Playgroud)
您可以使用重复事件调度程序来执行后台任务,如下所示:
import sched, time
from threading import Thread
from fastapi import FastAPI
import uvicorn
app = FastAPI()
s = sched.scheduler(time.time, time.sleep)
def print_event(sc):
print("Hello")
sc.enter(5, 1, print_event, (sc,))
def start_scheduler():
s.enter(5, 1, print_event, (s,))
s.run()
@app.on_event("startup")
async def startup_event():
thread = Thread(target = start_scheduler)
thread.start()
if __name__ == '__main__':
uvicorn.run(app, host="0.0.0.0", port=8000)
Run Code Online (Sandbox Code Playgroud)
如果您的任务是一个async def函数(有关FastAPI 中端点/后台任务的更多详细信息,请参阅此答案),那么您可以使用该函数将该任务添加到当前事件循环中。该函数接受一个协程对象(即函数)并返回一个对象(如果需要,可以用于任务,或者it 等)。该调用在当前线程的事件循环内创建任务,并在“后台”与事件循环中的所有其他任务同时执行,并在它们之间进行切换。defasync defasyncio.create_task()create_task()async defTaskawaitcancelawait
需要在调用 之前创建一个事件循环create_task(),并且在uvicorn以编程方式(例如使用uvicorn.run(app))或在终端中(例如使用uvicorn app:app)启动服务器时已经创建了事件循环。除了使用之外asyncio.create_task(),还可以使用asyncio.get_running_loop()来获取当前事件loop,然后调用loop.create_task()。
下面的示例使用最近记录的方式来添加lifespan事件(使用上下文管理器),即应在应用程序启动之前以及应用程序关闭时执行的代码(请参阅文档以及此答案以及此答案以获取更多详细信息和示例)。人们仍然可以使用startup和shutdown事件,如前面的选项所示;但是,这些事件处理程序可能会从未来的 FastAPI/Starlette 版本中删除。
import time
import threading
from fastapi import FastAPI
import uvicorn
app = FastAPI()
class BackgroundTasks(threading.Thread):
def run(self,*args,**kwargs):
while True:
print('Hello')
time.sleep(5)
if __name__ == '__main__':
t = BackgroundTasks()
t.start()
uvicorn.run(app, host="0.0.0.0", port=8000)
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
27447 次 |
| 最近记录: |