中间件中调用call_next(路径操作函数)期间FastAPI/Unvicorn请求挂起

Cra*_*lot 2 python python-requests fastapi uvicorn

我们在 EC2 上运行的 Docker 容器内有一个机器学习模型。

我们使用 Cortex.dev 来自动缩放 GPU。

不确定性的是,请求将在call_nextFastAPI 中间件的函数执行过程中挂起。不幸的是,它是不可重现的。

打印Middleware pre-request行被记录,但路径操作函数中的第一个打印语句永远不会被记录。

我们尝试过的事情:

  • 与 1 名工人一起运行 Uvicorn
  • run函数在没有异步的情况下运行
  • 运行 withbytes作为参数类型 forimage而不是UploadFile

这些更改都不能解决挂起问题,但这是性能最高的配置。

  1. 这是否意味着问题出在 FastAPI 而不是 Uvicorn?

  2. 如果是,什么可能导致 FastAPI 挂起?如果不是,问题出在哪里以及如何解决?

Dockerfile

FROM nvidia/cuda:11.4.0-runtime-ubuntu18.04

WORKDIR /usr/src/app

RUN apt-get -y update && \
    apt-get install -y --fix-missing \
    build-essential \
    cmake \
    python3 \
    python3-pip \
    ffmpeg \
    libsm6 \
    libxext6 \
    && apt-get clean && rm -rf /tmp/* /var/tmp/*

ADD ./requirements.txt ./

# install our dependencies
RUN python3 -m pip install --upgrade pip && python3 -m pip install -r requirements.txt && apt-get clean && rm -rf /tmp/* /var/tmp/*

ADD ./ ./

ENV LC_ALL=C.UTF-8 
ENV LANG=C.UTF-8

EXPOSE 8080

CMD uvicorn api:app --host 0.0.0.0 --port 8080 --workers 2
Run Code Online (Sandbox Code Playgroud)

api.py

from my_predictor import PythonPredictor
from typing import Optional
from datetime import datetime
import time
from starlette.responses import Response

from fastapi import FastAPI, File, UploadFile, Form, Response, HTTPException, Request
from fastapi.middleware.cors import CORSMiddleware

app = FastAPI()

origins = ["*"]

app.add_middleware(
    CORSMiddleware,
    allow_origins=origins,
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)


@app.middleware("http")
async def add_process_time_header(request: Request, call_next):
    cortex_id = request.headers.get('x-request-id')
    start_time = time.time()
    print("Cortex ID: " + cortex_id + ". > Middleware pre-request. Time stamp: " + str(start_time), flush=True)

    response = await call_next(request)

    process_time = time.time() - start_time
    print("Cortex ID: " + cortex_id + ". > Middleware post-response. Duration: " + str(process_time), flush=True)

    return response



@app.post("/")
async def run(request: Request, image: UploadFile = File(...), renderFactor:Optional[int] = Form(12), requestId:Optional[str] = Form('-1'),include_header:Optional[str] = Form('bin')):

    try:
        cortexId = request.headers.get('x-request-id')
        print("Cortex ID: " + cortexId + ". Request ID: " + requestId + " >>> Request received. Time stamp: " + str(datetime.now()))

        start = time.time()
    
        image = await image.read()

        payload = {}
        payload['image'] = image
        payload['renderFactor'] = renderFactor
        payload['requestId'] = requestId
        payload['include_header'] = include_header
        
        response = pred.predict(payload)

        end = time.time()

        totalTime = round(end - start, 2)

        print("Cortex ID: " + cortexId + ". Request ID: " + requestId + " > Request processed. Duration: " + str(totalTime) + " seconds. Time stamp: " + str(datetime.now()))

        if totalTime > 5:
            print("Long request detected. Duration: " + str(totalTime))

        return response
        
    except Exception as error:
        end = time.time()
        print(str(error))
        print("Cortex ID: " + cortexId + ". Request ID: " + requestId + " > Error. Duration: " + str(round(end - start, 2)) + " seconds . Time stamp: " + str(datetime.now()))

        raise HTTPException(status_code = 500, detail = str(error))

config = {}
pred = PythonPredictor(config)
Run Code Online (Sandbox Code Playgroud)

rag*_*ria 9

基本信息、根本原因和咆哮

嘿,我花了相当多的时间陷入这个悬而未决的问题(对于我的组织中具有多个自定义 MDW 的关键应用程序)。这基本上是因为@app.middleware("http")基于中间件,是在后端通过继承 Starlette 的BaseHTTPMiddleware. 所以对于显式继承编写的MDW也存在这个问题BaseHTTPMiddleware。造成这种情况的原因相当复杂,这是我目前所了解的:

  1. 这里(GitHub Starlette Issue)这里(Github FastAPI Issue):我了解到这个方法使用了StreamingResponse一些问题
  2. 这里(GitHub Starlette Issue):我了解到挂起的原因之一是:request.json()在 API 的请求生命周期中只允许等待一次,并且BaseHTTPMiddleware还自行创建一个 Request 对象(这会导致挂起问题,因为这是另一个请求)

最后一个链接还提到,导致挂起问题的原因是StreamingResponse响应的读取在第一次读取时不知何故耗尽,当涉及第二次读取时,它会无限期地等待它,这会导致挂起。(这里的第一个和第二个意思是:在 ASGI 应用程序中,消息以各种类型发送到客户端和应用程序http.response.starthttp.response.body例如等)

解决方案

所以,不要使用任何与BaseHTTPMiddleware. 为了解决这个问题,我使用此处给出的 ASGI 规范编写了所有自定义中间件

您可以像这样制作自定义中间件:

from starlette.types import ASGIApp, Receive, Send, Message

class LogProcessingTime:

    def __init__(self, app: ASGIApp) -> None:
        self.app = app
    
    async def __call__(self, scope: Scope, receive: Receive, send: Send):
        
        start_time = time.time()

        async def send_wrapper(message: Message):
            # This will capture response coming from APP Layer
            # response body will be in message where the type is
            # "http.response.body"
            if message["type"] == "http.response.body":
                process_time = time.time() - start_time
                # you can log this process_time now any way you prefer

            await send(message)
        
        await self.app(scope, receive, send_wrapper)


# you can add this to your app this way:
app.add_middleware(LogProcessingTime)

Run Code Online (Sandbox Code Playgroud)