Cra*_*lot 2 python python-requests fastapi uvicorn
我们在 EC2 上运行的 Docker 容器内有一个机器学习模型。
我们使用 Cortex.dev 来自动缩放 GPU。
不确定性的是,请求将在call_nextFastAPI 中间件的函数执行过程中挂起。不幸的是,它是不可重现的。
打印Middleware pre-request行被记录,但路径操作函数中的第一个打印语句永远不会被记录。
我们尝试过的事情:
run函数在没有异步的情况下运行bytes作为参数类型 forimage而不是UploadFile这些更改都不能解决挂起问题,但这是性能最高的配置。
这是否意味着问题出在 FastAPI 而不是 Uvicorn?
如果是,什么可能导致 FastAPI 挂起?如果不是,问题出在哪里以及如何解决?
Dockerfile
FROM nvidia/cuda:11.4.0-runtime-ubuntu18.04
WORKDIR /usr/src/app
RUN apt-get -y update && \
apt-get install -y --fix-missing \
build-essential \
cmake \
python3 \
python3-pip \
ffmpeg \
libsm6 \
libxext6 \
&& apt-get clean && rm -rf /tmp/* /var/tmp/*
ADD ./requirements.txt ./
# install our dependencies
RUN python3 -m pip install --upgrade pip && python3 -m pip install -r requirements.txt && apt-get clean && rm -rf /tmp/* /var/tmp/*
ADD ./ ./
ENV LC_ALL=C.UTF-8
ENV LANG=C.UTF-8
EXPOSE 8080
CMD uvicorn api:app --host 0.0.0.0 --port 8080 --workers 2
Run Code Online (Sandbox Code Playgroud)
api.py
from my_predictor import PythonPredictor
from typing import Optional
from datetime import datetime
import time
from starlette.responses import Response
from fastapi import FastAPI, File, UploadFile, Form, Response, HTTPException, Request
from fastapi.middleware.cors import CORSMiddleware
app = FastAPI()
origins = ["*"]
app.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.middleware("http")
async def add_process_time_header(request: Request, call_next):
cortex_id = request.headers.get('x-request-id')
start_time = time.time()
print("Cortex ID: " + cortex_id + ". > Middleware pre-request. Time stamp: " + str(start_time), flush=True)
response = await call_next(request)
process_time = time.time() - start_time
print("Cortex ID: " + cortex_id + ". > Middleware post-response. Duration: " + str(process_time), flush=True)
return response
@app.post("/")
async def run(request: Request, image: UploadFile = File(...), renderFactor:Optional[int] = Form(12), requestId:Optional[str] = Form('-1'),include_header:Optional[str] = Form('bin')):
try:
cortexId = request.headers.get('x-request-id')
print("Cortex ID: " + cortexId + ". Request ID: " + requestId + " >>> Request received. Time stamp: " + str(datetime.now()))
start = time.time()
image = await image.read()
payload = {}
payload['image'] = image
payload['renderFactor'] = renderFactor
payload['requestId'] = requestId
payload['include_header'] = include_header
response = pred.predict(payload)
end = time.time()
totalTime = round(end - start, 2)
print("Cortex ID: " + cortexId + ". Request ID: " + requestId + " > Request processed. Duration: " + str(totalTime) + " seconds. Time stamp: " + str(datetime.now()))
if totalTime > 5:
print("Long request detected. Duration: " + str(totalTime))
return response
except Exception as error:
end = time.time()
print(str(error))
print("Cortex ID: " + cortexId + ". Request ID: " + requestId + " > Error. Duration: " + str(round(end - start, 2)) + " seconds . Time stamp: " + str(datetime.now()))
raise HTTPException(status_code = 500, detail = str(error))
config = {}
pred = PythonPredictor(config)
Run Code Online (Sandbox Code Playgroud)
嘿,我花了相当多的时间陷入这个悬而未决的问题(对于我的组织中具有多个自定义 MDW 的关键应用程序)。这基本上是因为@app.middleware("http")基于中间件,是在后端通过继承 Starlette 的BaseHTTPMiddleware. 所以对于显式继承编写的MDW也存在这个问题BaseHTTPMiddleware。造成这种情况的原因相当复杂,这是我目前所了解的:
StreamingResponse一些问题request.json()在 API 的请求生命周期中只允许等待一次,并且BaseHTTPMiddleware还自行创建一个 Request 对象(这会导致挂起问题,因为这是另一个请求)最后一个链接还提到,导致挂起问题的原因是StreamingResponse;响应的读取在第一次读取时不知何故耗尽,当涉及第二次读取时,它会无限期地等待它,这会导致挂起。(这里的第一个和第二个意思是:在 ASGI 应用程序中,消息以各种类型发送到客户端和应用程序http.response.start,http.response.body例如等)
所以,不要使用任何与BaseHTTPMiddleware. 为了解决这个问题,我使用此处给出的 ASGI 规范编写了所有自定义中间件
您可以像这样制作自定义中间件:
from starlette.types import ASGIApp, Receive, Send, Message
class LogProcessingTime:
def __init__(self, app: ASGIApp) -> None:
self.app = app
async def __call__(self, scope: Scope, receive: Receive, send: Send):
start_time = time.time()
async def send_wrapper(message: Message):
# This will capture response coming from APP Layer
# response body will be in message where the type is
# "http.response.body"
if message["type"] == "http.response.body":
process_time = time.time() - start_time
# you can log this process_time now any way you prefer
await send(message)
await self.app(scope, receive, send_wrapper)
# you can add this to your app this way:
app.add_middleware(LogProcessingTime)
Run Code Online (Sandbox Code Playgroud)