如何在 FastAPI 应用程序中发送操作进度?

use*_*_12 13 python api python-3.x fastapi uvicorn

我已经部署了一个 fastapi 端点,

from fastapi import FastAPI, UploadFile
from typing import List

app = FastAPI()

@app.post('/work/test')
async def testing(files: List(UploadFile)):
    for i in files:
        .......
        # do a lot of operations on each file

        # after than I am just writing that processed data into mysql database
        # cur.execute(...)
        # cur.commit()
        .......
    
    # just returning "OK" to confirm data is written into mysql
    return {"response" : "OK"}
Run Code Online (Sandbox Code Playgroud)

我可以从 API 端点请求输出,它对我来说工作得很好。

现在,对我来说最大的挑战是知道每次迭代需要多少时间。因为在 UI 部分(那些访问我的 API 端点的人)我想帮助他们为正在处理的每个迭代/文件显示一个进度条(TIME TAKEN)。

我有什么可能的方法来实现它吗?如果是这样,请帮助我了解如何进一步处理?

谢谢你。

ale*_*ame 17

方法

轮询

跟踪任务进度的最佳方法是轮询:

  1. 收到后request在后端启动任务:
    1. task object在存储中(例如内存redis中等)创建一个。必须task object包含以下数据:task IDstatus(待定、已完成)、result和其他。
    2. 在后台运行任务(协程、线程、多处理、任务Celery队列等)arqaio-pikadramatiq
    3. 202 (Accepted)立即响应,返回先前收到的答案task ID
  2. 更新任务状态:
    1. 这可以来自任务本身内部,如果它知道任务存储并且可以访问它。任务本身会定期更新有关其自身的信息。
    2. 或者使用任务监视器(Observer,producer-consumer模式),它将监视任务的状态及其结果。并且它还会更新存储中的信息。
  3. client side( front-end) 上启动对端点的任务状态轮询周期/task/{ID}/status,该轮询周期从任务存储中获取信息。

流式响应

流式传输是一种不太方便的定期获取请求处理状态的方法。当我们逐渐推送响应而不关闭连接时。它有许多明显的缺点,例如,如果连接断开,您可能会丢失信息。Streaming Api 是 REST Api 之外的另一种方法。

网络套接字

您还可以使用websockets进行实时通知和双向通信。

链接:

  • 进度条轮询方法的示例以及更详细的描述django + celery可以在以下链接中找到:

https://www.dangtrinh.com/2013/07/django-celery-display-progress-bar-of.html

https://buildwithdjango.com/blog/post/celery-progress-bars/

  • 我在这里提供了使用多重处理在 FastAPI 中运行后台任务的简化示例:

/sf/answers/4421970941/

旧答案:

您可以在后台运行任务,返回其id并提供/status前端定期调用的端点。在状态响应中,您可以返回任务现在的状态(例如,挂起当前处理的文件的编号)。我在这里提供了一些简单的例子。

演示

轮询

使用异步任务的方法演示(单工作解决方案):

import asyncio
from http import HTTPStatus
from fastapi import BackgroundTasks
from typing import Dict, List
from uuid import UUID, uuid4
import uvicorn
from fastapi import FastAPI
from pydantic import BaseModel, Field


class Job(BaseModel):
    uid: UUID = Field(default_factory=uuid4)
    status: str = "in_progress"
    progress: int = 0
    result: int = None


app = FastAPI()
jobs: Dict[UUID, Job] = {}  # Dict as job storage


async def long_task(queue: asyncio.Queue, param: int):
    for i in range(1, param):  # do work and return our progress
        await asyncio.sleep(1)
        await queue.put(i)
    await queue.put(None)


async def start_new_task(uid: UUID, param: int) -> None:

    queue = asyncio.Queue()
    task = asyncio.create_task(long_task(queue, param))

    while progress := await queue.get():  # monitor task progress
        jobs[uid].progress = progress

    jobs[uid].status = "complete"


@app.post("/new_task/{param}", status_code=HTTPStatus.ACCEPTED)
async def task_handler(background_tasks: BackgroundTasks, param: int):
    new_task = Job()
    jobs[new_task.uid] = new_task
    background_tasks.add_task(start_new_task, new_task.uid, param)
    return new_task


@app.get("/task/{uid}/status")
async def status_handler(uid: UUID):
    return jobs[uid]
Run Code Online (Sandbox Code Playgroud)

问题中循环的改编示例

后台处理函数被定义为defFastAPI在线程池上运行它。

import time
from http import HTTPStatus

from fastapi import BackgroundTasks, UploadFile, File
from typing import Dict, List
from uuid import UUID, uuid4
from fastapi import FastAPI
from pydantic import BaseModel, Field


class Job(BaseModel):
    uid: UUID = Field(default_factory=uuid4)
    status: str = "in_progress"
    processed_files: List[str] = Field(default_factory=list)


app = FastAPI()
jobs: Dict[UUID, Job] = {}


def process_files(task_id: UUID, files: List[UploadFile]):
    for i in files:
        time.sleep(5)  # pretend long task
        # ...
        # do a lot of operations on each file
        # then append the processed file to a list
        # ...
        jobs[task_id].processed_files.append(i.filename)
    jobs[task_id].status = "completed"


@app.post('/work/test', status_code=HTTPStatus.ACCEPTED)
async def work(background_tasks: BackgroundTasks, files: List[UploadFile] = File(...)):
    new_task = Job()
    jobs[new_task.uid] = new_task
    background_tasks.add_task(process_files, new_task.uid, files)
    return new_task


@app.get("/work/{uid}/status")
async def status_handler(uid: UUID):
    return jobs[uid]
Run Code Online (Sandbox Code Playgroud)

流媒体

import asyncio
from http import HTTPStatus
from fastapi import BackgroundTasks
from typing import Dict, List
from uuid import UUID, uuid4
import uvicorn
from fastapi import FastAPI
from pydantic import BaseModel, Field


class Job(BaseModel):
    uid: UUID = Field(default_factory=uuid4)
    status: str = "in_progress"
    progress: int = 0
    result: int = None


app = FastAPI()
jobs: Dict[UUID, Job] = {}  # Dict as job storage


async def long_task(queue: asyncio.Queue, param: int):
    for i in range(1, param):  # do work and return our progress
        await asyncio.sleep(1)
        await queue.put(i)
    await queue.put(None)


async def start_new_task(uid: UUID, param: int) -> None:

    queue = asyncio.Queue()
    task = asyncio.create_task(long_task(queue, param))

    while progress := await queue.get():  # monitor task progress
        jobs[uid].progress = progress

    jobs[uid].status = "complete"


@app.post("/new_task/{param}", status_code=HTTPStatus.ACCEPTED)
async def task_handler(background_tasks: BackgroundTasks, param: int):
    new_task = Job()
    jobs[new_task.uid] = new_task
    background_tasks.add_task(start_new_task, new_task.uid, param)
    return new_task


@app.get("/task/{uid}/status")
async def status_handler(uid: UUID):
    return jobs[uid]
Run Code Online (Sandbox Code Playgroud)


And*_*yko 9

以下是使用 uniq 标识符和全局可用字典的解决方案,其中保存有关作业的信息:

注意:下面的代码可以安全使用,直到您使用动态键值(在使用的示例 uuid 中)并将应用程序保持在单个进程内。

  1. 要启动应用程序,请创建一个文件main.py
  2. 跑步uvicorn main:app --reload
  3. 通过访问创建职位条目http://127.0.0.1:8000/
  4. 重复步骤3创建多个作业
  5. 进入http://127.0.0.1/status页面查看页面状态。
  6. 转至http://127.0.0.1/status/{identifier}按作业 ID 查看作业的进度。

应用程序代码:

from fastapi import FastAPI, UploadFile
import uuid
from typing import List


import asyncio


context = {'jobs': {}}

app = FastAPI()



async def do_work(job_key, files=None):
    iter_over = files if files else range(100)
    for file, file_number in enumerate(iter_over):
        jobs = context['jobs']
        job_info = jobs[job_key]
        job_info['iteration'] = file_number
        job_info['status'] = 'inprogress'
        await asyncio.sleep(1)
    pending_jobs[job_key]['status'] = 'done'


@app.post('/work/test')
async def testing(files: List[UploadFile]):
    identifier = str(uuid.uuid4())
    context[jobs][identifier] = {}
    asyncio.run_coroutine_threadsafe(do_work(identifier, files), loop=asyncio.get_running_loop())

    return {"identifier": identifier}


@app.get('/')
async def get_testing():
    identifier = str(uuid.uuid4())
    context['jobs'][identifier] = {}
    asyncio.run_coroutine_threadsafe(do_work(identifier), loop=asyncio.get_running_loop())

    return {"identifier": identifier}

@app.get('/status')
def status():
    return {
        'all': list(context['jobs'].values()),
    }

@app.get('/status/{identifier}')
async def status(identifier):
    return {
        "status": context['jobs'].get(identifier, 'job with that identifier is undefined'),
    }

Run Code Online (Sandbox Code Playgroud)