从 FastAPI 中的后台任务获取返回状态

use*_*r12 6 python-3.x fastapi

我有一个 API,它发布创建后台作业的作业,我想在另一个 GET api 上发送作业状态。如何实现这一目标?在background_work()函数中,我将使用多处理作为内部subprocess.call()调用的目标调用。

from fastapi import BackgroundTasks, FastAPI

app = FastAPI()

def background_work(data: str):
    # some computation on data and return it
    return status

@app.post("/post_job", status_code=HTTP_201_CREATED)
async def send_notification(data: str, background_tasks: BackgroundTasks):
    background_tasks.add_task(background_work, data)
    return {"message": "Job Created, check status after some time!"}

@app.get("/get_status")
def status():
    #how to return status of job submitted to background task

Run Code Online (Sandbox Code Playgroud)

Gab*_*lli 17

目前,使用 FastAPI 无法实现这一点,因为后台任务只是对发送响应后要调用的可调用对象的引用,它们不存储任何类型的状态。

您将必须使用 Celery 或其他库。

  • 您还可以尝试查看 WPS(Web 处理服务) - PyWPS 是其 Python 实现之一。PyWPS 可以作为单独的服务运行,并且可以从 FastAPI 应用程序内部使用“owslib.wps.WPSExecution”来控制 WPS 进程的状态。 (2认同)

Lok*_*oki 9

我正在像这样使用 fastAPI,结合使用concurrent.futures.ProcessPoolExecutor()asyncio 来管理长时间运行的作业。

If you don't want to rely on other modules (celery etc), you need to manage yourself the state of your job, and store it somewhere. I store it in the DB so that pending jobs can be resumed after a restart of the server.

Note that you must NOT perform CPU intensive computations in the background_tasks of the app, because it runs in the same async event loop that serves the requests and it will stall your app. Instead submit them to a thread pool or a process pool.

  • 将任务状态存储在数据库上是一个好主意。我刚刚实现了它,它工作得很好,任务本身负责更新它的状态。谢谢。这应该是主要答案,因为它确实解决了问题。另外:请记住检查启动时是否出现故障,以防应用程序崩溃。我正在使用 Fastapi Events:startup 将所有不成功的任务标记为失败 (2认同)
  • 谢谢。它对我也很有用。不过,必须小心对数据库的并发访问以更新状态。我在 sqlAlchemy Sessions 方面遇到了一些麻烦,我必须仔细阅读文档。您不能使用 fastAPI 示例中的依赖项,您需要在单独的线程或进程中运行的函数中创建一个新会话。 (2认同)

小智 6

试试这个模式:

import time
from fastapi import BackgroundTasks, FastAPI

app = FastAPI()

class TaskState:

    def __init__(self):
        self.counter = 0

    def background_work(self):
        while True:
            self.counter += 1
            time.sleep(1)

    def get_state(self):
        return self.counter

state = TaskState()

@app.post("/post_job", status_code=HTTP_201_CREATED)
async def send_notification(background_tasks: BackgroundTasks):
    background_tasks.add_task(state.background_work)
    return {"message": "Job Created, check status after some time!"}

@app.get("/get_status")
def status():
    return state.get_state()
Run Code Online (Sandbox Code Playgroud)

  • 天啊!这是一场并发噩梦。TaskState 是全局的,并且没有锁定机制,如果触发多个 send_notification,那么你就有大麻烦了。 (3认同)