如何在 FastAPI 中进行多处理

Cry*_*ofa 18 python multiprocessing python-asyncio fastapi uvicorn

在处理 FastAPI 请求时,我有一个受 CPU 限制的任务要对列表的每个元素执行。我想在多个 CPU 内核上进行此处理。

在 FastAPI 中执行此操作的正确方法是什么?我可以使用标准multiprocessing模块吗?到目前为止,我发现的所有教程/问题仅涵盖 I/O 绑定任务,如 Web 请求。

ale*_*ame 33

async def 端点

您可以使用loop.run_in_executorProcessPoolExecutor在单独的进程中启动功能。

@app.post("/async-endpoint")
async def test_endpoint():
    loop = asyncio.get_event_loop()
    with concurrent.futures.ProcessPoolExecutor() as pool:
        result = await loop.run_in_executor(pool, cpu_bound_func)  # wait result
Run Code Online (Sandbox Code Playgroud)

def 端点

由于def端点在单独的线程中隐式运行,您可以使用模块multiprocessingconcurrent.futures的全部功能。请注意,内部def函数,await可能无法使用。样品:

@app.post("/def-endpoint")
def test_endpoint():
    ...
    with multiprocessing.Pool(3) as p:
        result = p.map(f, [1, 2, 3])
Run Code Online (Sandbox Code Playgroud)
@app.post("/def-endpoint/")
def test_endpoint():
    ...
    with concurrent.futures.ProcessPoolExecutor(max_workers=3) as executor:
      results = executor.map(f, [1, 2, 3])
Run Code Online (Sandbox Code Playgroud)

注意应该记住,在端点中创建进程池以及创建大量线程会随着请求数量的增加而导致响应变慢。


即时执行

在单独的进程中执行函数并立即等待结果的最简单和最原生的方法是将loop.run_in_executorProcessPoolExecutor一起使用。

如下例所示,可以在应用程序启动时创建一个池,并且不要忘记在应用程序退出时关闭。可以使用max_workers ProcessPoolExecutor构造函数参数设置池中使用的进程数。如果max_workersNone或者没有给出,将默认为机器上的处理器数量。

这种方法的缺点是请求处理程序(路径操作)在单独的进程中等待计算完成,而客户端连接保持打开状态。如果由于某种原因连接丢失,那么结果将无处可返回。

import asyncio
from concurrent.futures.process import ProcessPoolExecutor
from fastapi import FastAPI

from calc import cpu_bound_func

app = FastAPI()


async def run_in_process(fn, *args):
    loop = asyncio.get_event_loop()
    return await loop.run_in_executor(app.state.executor, fn, *args)  # wait and return result


@app.get("/{param}")
async def handler(param: int):
    res = await run_in_process(cpu_bound_func, param)
    return {"result": res}


@app.on_event("startup")
async def on_startup():
    app.state.executor = ProcessPoolExecutor()


@app.on_event("shutdown")
async def on_shutdown():
    app.state.executor.shutdown()
Run Code Online (Sandbox Code Playgroud)

移至背景

通常,CPU 密集型任务在后台执行。FastAPI提供了能够运行后台任务来运行返回响应,在其内部可以启动和异步等待你的CPU绑定的任务的结果。

在这种情况下,例如,您可以立即返回"Accepted"(HTTP 代码 202)的响应和唯一的 task ID,在后台继续计算,客户端稍后可以使用 this 请求任务的状态ID

BackgroundTasks提供一些功能,特别是,您可以运行其中的几个(包括在依赖项中)。并且在其中可以使用依赖中获取的资源,只有在所有任务完成后才会被清理,而在出现异常的情况下可以正确处理它们。在这张图中可以更清楚地看到这一点。

下面是一个执行最小任务跟踪的示例。假定应用程序运行的一个实例。

import asyncio
from concurrent.futures.process import ProcessPoolExecutor
from http import HTTPStatus

from fastapi import BackgroundTasks
from typing import Dict
from uuid import UUID, uuid4
from fastapi import FastAPI
from pydantic import BaseModel, Field

from calc import cpu_bound_func


class Job(BaseModel):
    uid: UUID = Field(default_factory=uuid4)
    status: str = "in_progress"
    result: int = None


app = FastAPI()
jobs: Dict[UUID, Job] = {}


async def run_in_process(fn, *args):
    loop = asyncio.get_event_loop()
    return await loop.run_in_executor(app.state.executor, fn, *args)  # wait and return result


async def start_cpu_bound_task(uid: UUID, param: int) -> None:
    jobs[uid].result = await run_in_process(cpu_bound_func, param)
    jobs[uid].status = "complete"


@app.post("/new_cpu_bound_task/{param}", status_code=HTTPStatus.ACCEPTED)
async def task_handler(param: int, background_tasks: BackgroundTasks):
    new_task = Job()
    jobs[new_task.uid] = new_task
    background_tasks.add_task(start_cpu_bound_task, new_task.uid, param)
    return new_task


@app.get("/status/{uid}")
async def status_handler(uid: UUID):
    return jobs[uid]


@app.on_event("startup")
async def startup_event():
    app.state.executor = ProcessPoolExecutor()


@app.on_event("shutdown")
async def on_shutdown():
    app.state.executor.shutdown()
Run Code Online (Sandbox Code Playgroud)

更强大的解决方案

上述所有例子是非常简单的,但如果你需要一些更强大的系统重分布式计算,那么你可以看看一旁信息经纪人RabbitMQKafkaNATS等和图书馆使用它们像芹菜。

  • 如果后台执行是的,但我修改了返回示例的答案。 (2认同)