use*_*_12 13 python api python-3.x fastapi uvicorn
我已经部署了一个 fastapi 端点,
from fastapi import FastAPI, UploadFile
from typing import List
app = FastAPI()
@app.post('/work/test')
async def testing(files: List(UploadFile)):
for i in files:
.......
# do a lot of operations on each file
# after than I am just writing that processed data into mysql database
# cur.execute(...)
# cur.commit()
.......
# just returning "OK" to confirm data is written into mysql
return {"response" : "OK"}
Run Code Online (Sandbox Code Playgroud)
我可以从 API 端点请求输出,它对我来说工作得很好。
现在,对我来说最大的挑战是知道每次迭代需要多少时间。因为在 UI 部分(那些访问我的 API 端点的人)我想帮助他们为正在处理的每个迭代/文件显示一个进度条(TIME TAKEN)。
我有什么可能的方法来实现它吗?如果是这样,请帮助我了解如何进一步处理?
谢谢你。
ale*_*ame 17
跟踪任务进度的最佳方法是轮询:
request在后端启动任务:
Observer,producer-consumer模式),它将监视任务的状态及其结果。并且它还会更新存储中的信息。client side( front-end) 上启动对端点的任务状态轮询周期/task/{ID}/status,该轮询周期从任务存储中获取信息。流式传输是一种不太方便的定期获取请求处理状态的方法。当我们逐渐推送响应而不关闭连接时。它有许多明显的缺点,例如,如果连接断开,您可能会丢失信息。Streaming Api 是 REST Api 之外的另一种方法。
您还可以使用websockets进行实时通知和双向通信。
django + celery可以在以下链接中找到:https://www.dangtrinh.com/2013/07/django-celery-display-progress-bar-of.html
https://buildwithdjango.com/blog/post/celery-progress-bars/
您可以在后台运行任务,返回其id并提供/status前端定期调用的端点。在状态响应中,您可以返回任务现在的状态(例如,挂起当前处理的文件的编号)。我在这里提供了一些简单的例子。
使用异步任务的方法演示(单工作解决方案):
import asyncio
from http import HTTPStatus
from fastapi import BackgroundTasks
from typing import Dict, List
from uuid import UUID, uuid4
import uvicorn
from fastapi import FastAPI
from pydantic import BaseModel, Field
class Job(BaseModel):
uid: UUID = Field(default_factory=uuid4)
status: str = "in_progress"
progress: int = 0
result: int = None
app = FastAPI()
jobs: Dict[UUID, Job] = {} # Dict as job storage
async def long_task(queue: asyncio.Queue, param: int):
for i in range(1, param): # do work and return our progress
await asyncio.sleep(1)
await queue.put(i)
await queue.put(None)
async def start_new_task(uid: UUID, param: int) -> None:
queue = asyncio.Queue()
task = asyncio.create_task(long_task(queue, param))
while progress := await queue.get(): # monitor task progress
jobs[uid].progress = progress
jobs[uid].status = "complete"
@app.post("/new_task/{param}", status_code=HTTPStatus.ACCEPTED)
async def task_handler(background_tasks: BackgroundTasks, param: int):
new_task = Job()
jobs[new_task.uid] = new_task
background_tasks.add_task(start_new_task, new_task.uid, param)
return new_task
@app.get("/task/{uid}/status")
async def status_handler(uid: UUID):
return jobs[uid]
Run Code Online (Sandbox Code Playgroud)
后台处理函数被定义为defFastAPI在线程池上运行它。
import time
from http import HTTPStatus
from fastapi import BackgroundTasks, UploadFile, File
from typing import Dict, List
from uuid import UUID, uuid4
from fastapi import FastAPI
from pydantic import BaseModel, Field
class Job(BaseModel):
uid: UUID = Field(default_factory=uuid4)
status: str = "in_progress"
processed_files: List[str] = Field(default_factory=list)
app = FastAPI()
jobs: Dict[UUID, Job] = {}
def process_files(task_id: UUID, files: List[UploadFile]):
for i in files:
time.sleep(5) # pretend long task
# ...
# do a lot of operations on each file
# then append the processed file to a list
# ...
jobs[task_id].processed_files.append(i.filename)
jobs[task_id].status = "completed"
@app.post('/work/test', status_code=HTTPStatus.ACCEPTED)
async def work(background_tasks: BackgroundTasks, files: List[UploadFile] = File(...)):
new_task = Job()
jobs[new_task.uid] = new_task
background_tasks.add_task(process_files, new_task.uid, files)
return new_task
@app.get("/work/{uid}/status")
async def status_handler(uid: UUID):
return jobs[uid]
Run Code Online (Sandbox Code Playgroud)
import asyncio
from http import HTTPStatus
from fastapi import BackgroundTasks
from typing import Dict, List
from uuid import UUID, uuid4
import uvicorn
from fastapi import FastAPI
from pydantic import BaseModel, Field
class Job(BaseModel):
uid: UUID = Field(default_factory=uuid4)
status: str = "in_progress"
progress: int = 0
result: int = None
app = FastAPI()
jobs: Dict[UUID, Job] = {} # Dict as job storage
async def long_task(queue: asyncio.Queue, param: int):
for i in range(1, param): # do work and return our progress
await asyncio.sleep(1)
await queue.put(i)
await queue.put(None)
async def start_new_task(uid: UUID, param: int) -> None:
queue = asyncio.Queue()
task = asyncio.create_task(long_task(queue, param))
while progress := await queue.get(): # monitor task progress
jobs[uid].progress = progress
jobs[uid].status = "complete"
@app.post("/new_task/{param}", status_code=HTTPStatus.ACCEPTED)
async def task_handler(background_tasks: BackgroundTasks, param: int):
new_task = Job()
jobs[new_task.uid] = new_task
background_tasks.add_task(start_new_task, new_task.uid, param)
return new_task
@app.get("/task/{uid}/status")
async def status_handler(uid: UUID):
return jobs[uid]
Run Code Online (Sandbox Code Playgroud)
以下是使用 uniq 标识符和全局可用字典的解决方案,其中保存有关作业的信息:
注意:下面的代码可以安全使用,直到您使用动态键值(在使用的示例 uuid 中)并将应用程序保持在单个进程内。
main.pyuvicorn main:app --reloadhttp://127.0.0.1:8000/http://127.0.0.1/status页面查看页面状态。http://127.0.0.1/status/{identifier}按作业 ID 查看作业的进度。应用程序代码:
from fastapi import FastAPI, UploadFile
import uuid
from typing import List
import asyncio
context = {'jobs': {}}
app = FastAPI()
async def do_work(job_key, files=None):
iter_over = files if files else range(100)
for file, file_number in enumerate(iter_over):
jobs = context['jobs']
job_info = jobs[job_key]
job_info['iteration'] = file_number
job_info['status'] = 'inprogress'
await asyncio.sleep(1)
pending_jobs[job_key]['status'] = 'done'
@app.post('/work/test')
async def testing(files: List[UploadFile]):
identifier = str(uuid.uuid4())
context[jobs][identifier] = {}
asyncio.run_coroutine_threadsafe(do_work(identifier, files), loop=asyncio.get_running_loop())
return {"identifier": identifier}
@app.get('/')
async def get_testing():
identifier = str(uuid.uuid4())
context['jobs'][identifier] = {}
asyncio.run_coroutine_threadsafe(do_work(identifier), loop=asyncio.get_running_loop())
return {"identifier": identifier}
@app.get('/status')
def status():
return {
'all': list(context['jobs'].values()),
}
@app.get('/status/{identifier}')
async def status(identifier):
return {
"status": context['jobs'].get(identifier, 'job with that identifier is undefined'),
}
Run Code Online (Sandbox Code Playgroud)