在 Uvicorn/FastAPI 内发出下游 Https 请求的正确方法是什么?

Ste*_*veJ 5 python python-requests fastapi

我有一个 API 端点(FastAPI / Uvicorn)。除此之外,它还向另一个 API 请求信息。当我使用多个并发请求加载 API 时,我开始收到以下错误:

h11._util.LocalProtocolError: can't handle event type ConnectionClosed when role=SERVER and state=SEND_RESPONSE
Run Code Online (Sandbox Code Playgroud)

在正常环境中,我会利用request.session,但我知道它不是完全线程安全的。

因此,在 FastAPI 等框架内使用请求的正确方法是什么,其中多个线程将requests同时使用该库?

Chr*_*ris 9

除了使用 之外requests,您还可以使用httpx,它也提供了一个asyncAPI(执行测试时httpxFastAPI 的文档中也建议async,以及 FastAPI/Starlette 最近替换了上的 HTTP 客户端TestClientrequestshttpx)。

\n

下面的示例基于httpx文档中给出的示例,演示如何使用该库发出异步 HTTP(s) 请求,然后将响应流式传输回客户端。您httpx.AsyncClient()可以使用 来代替requests.Session(),当向同一主机发出多个请求时,这很有用,因为底层 TCP 连接将被重用,而不是为每个请求重新创建一个\xe2\x80\x94,从而导致显着的性能提升。此外,它还允许您跨请求重用headers和 其他设置(例如proxiestimeout)以及 persist cookies。您生成一个Client并在每次需要时重复使用它。完成后,您可以显await client.aclose()关闭客户端(您可以在shutdown事件处理程序中执行此操作)。示例和更多详细信息也可以在此答案中找到。

\n

例子

\n
from fastapi import FastAPI\nfrom starlette.background import BackgroundTask\nfrom fastapi.responses import StreamingResponse\nimport httpx\n\n\napp = FastAPI()\n\n\n@app.on_event("startup")\nasync def startup_event():\n    app.state.client = httpx.AsyncClient()\n\n\n@app.on_event(\'shutdown\')\nasync def shutdown_event():\n    await app.state.client.aclose()\n\n\n@app.get(\'/\')\nasync def home():\n    client = app.state.client\n    req = client.build_request(\'GET\', \'https://www.example.com/\')\n    r = await client.send(req, stream=True)\n    return StreamingResponse(r.aiter_raw(), background=BackgroundTask(r.aclose))\n
Run Code Online (Sandbox Code Playgroud)\n

示例(已更新)

\n

由于startupshutdown现已被弃用(并且将来可能会被完全删除),您可以改为使用lifespan处理程序来初始化httpx客户端,以及在关闭时关闭客户端实例,类似于此答案中演示的内容。Starlette在其文档页面中专门提供了一个使用lifespan处理程序和客户端的示例。正如Starlette 的文档httpx中所述:

\n
\n

具有lifespan的概念state,它是一个字典,\n可用于在生命周期和\n请求之间共享对象。

\n

请求中接收到的状态state是生命周期处理程序中接收到的状态的浅表副本。

\n
\n

因此,可以使用端点内部访问添加到生命周期处理程序中状态的对象request.state。下面的示例使用流响应来与外部服务器通信,并将响应发送回客户端。有关响应流方法(即、、等)的更多详细信息,请参阅此处。asynchttpxaiter_bytes()aiter_text()aiter_lines()

\n

如果您想使用 amedia_type表示StreamingResponse,您可以使用原始响应中的 ,如下所示:media_type=r.headers[\'content-type\']。但是,正如此答案中所述,您需要确保media_type未设置为text/plain; 否则,内容将不会按预期在浏览器中传输,除非您禁用MIME 嗅探(请查看链接的答案以获取更多详细信息和解决方案)。

\n
from fastapi import FastAPI, Request\nfrom contextlib import asynccontextmanager\nfrom fastapi.responses import StreamingResponse\nfrom starlette.background import BackgroundTask\nimport httpx\n\n\n@asynccontextmanager\nasync def lifespan(app: FastAPI):\n    # Initialise the Client on startup and add it to the state\n    async with httpx.AsyncClient() as client:\n        yield {\'client\': client}\n        # The Client closes on shutdown \n\n\napp = FastAPI(lifespan=lifespan)\n\n\n@app.get(\'/\')\nasync def home(request: Request):\n    client = request.state.client\n    req = client.build_request(\'GET\', \'https://www.example.com\')\n    r = await client.send(req, stream=True)\n    return StreamingResponse(r.aiter_raw(), background=BackgroundTask(r.aclose)) \n
Run Code Online (Sandbox Code Playgroud)\n

如果出于任何原因,您需要在响应客户端之前在服务器端逐块读取内容,您可以按如下方式执行此操作:

\n
@app.get(\'/\')\nasync def home(request: Request):\n    client = request.state.client\n    req = client.build_request(\'GET\', \'https://www.example.com\')\n    r = await client.send(req, stream=True)\n    \n    async def gen():\n        async for chunk in r.aiter_raw():\n            yield chunk\n        await r.aclose()\n        \n    return StreamingResponse(gen())\n
Run Code Online (Sandbox Code Playgroud)\n

如果您不想使用流式响应,而是首先读取httpx响应(这会将响应数据存储到服务器的 RAM 中;因此,您应该确保有足够的空间)可容纳数据),您可以使用以下内容。请注意,usingr.json()仅适用于响应数据为 JSON 格式的情况;否则,您可以直接返回一个PlainTextResponse或一个自定义Response,如下所示。

\n
from fastapi import Response\nfrom fastapi.responses import PlainTextResponse\n\n@app.get(\'/\')\nasync def home(request: Request):\n    client = request.state.client\n    req = client.build_request(\'GET\', \'https://www.example.com\')\n    r = await client.send(req)\n    content_type = r.headers.get(\'content-type\')\n    \n    if content_type == \'application/json\':\n        return r.json()\n    elif content_type == \'text/plain\':\n        return PlainTextResponse(content=r.text)\n    else:\n        return Response(content=r.content) \n
Run Code Online (Sandbox Code Playgroud)\n
\n

使用async的 APIhttpx意味着您必须使用async def;定义端点 否则,您将必须使用标准同步 API(对于defvsasync def请参阅此答案),如此github 讨论中所述:

\n
\n

是的。HTTPX 旨在是线程安全的,是的,跨所有线程的单个客户端实例在连接池方面比使用每个线程的实例会做得更好。

\n
\n

limits您还可以使用关键字参数来控制连接池大小Client(请参阅池限制配置)。例如:

\n
limits = httpx.Limits(max_keepalive_connections=5, max_connections=10)\nclient = httpx.Client(limits=limits)\n
Run Code Online (Sandbox Code Playgroud)\n