如何在 Python FastAPI 中记录原始 HTTP 请求/响应?

Mar*_*iño 25 python logging audit-logging python-logging fastapi

我们正在使用 Python FastAPI 编写一个 Web 服务,该服务将托管在 Kubernetes 中。出于审计目的,我们需要保存特定路由的request/的原始 JSON 正文。JSON的主体大小约为1MB,最好这不应该影响响应时间。我们怎样才能做到这一点?responserequestresponse

Chr*_*ris 41

选项 1 - 使用中间件

\n

你可以使用一个Middleware. Amiddleware接受传入应用程序的每个请求,因此允许您request在任何特定端点处理该请求之前处理 , 以及response在将其返回给客户端之前处理 , 。要创建,您可以在函数之上middleware使用装饰器,如下所示。@app.middleware("http")

\n

由于您需要使用或 来使用middleware\xe2\x80\x94内的流中的请求正文,如本答案所示(在幕后,前一个方法实际上调用后者,请参见此处)\xe2\x80\x94然后它当您稍后将其传递到相应的端点时,该信息将不可用。因此,您可以按照本文中描述的方法使请求正文可用(即使用下面的函数)。更新:问题现已修复,因此,如果您使用 FastAPI 0.108.0 或更高版本,则无需使用该解决方法。request.body()request.stream()requestset_body()

\n

至于正文,您可以使用与此答案response中描述的相同方法来消耗正文,然后将其返回给客户端。上述链接答案中描述的任一选项都可以使用;然而,下面的代码使用选项 2,它将主体存储在 bytes 对象中并直接返回自定义值(以及原始 的,和)。responseResponsestatus_codeheadersmedia_typeresponse

\n

要记录数据,您可以使用 a BackgroundTask如此答案此答案此答案中所述。ABackgroundTask仅在发送响应后运行(另请参阅Starlette 文档);因此,客户端在接收之前不必等待日志记录完成response(因此,响应时间不会受到明显影响)。

\n

笔记

\n

如果您的流媒体requestresponse主体不适合服务器的 RAM(例如,想象运行 8GB RAM 的计算机上有 100GB 的主体),那么当您存储数据时,就会出现问题RAM 没有足够的空间来容纳累积的数据。另外,如果数据很大response(例如,很大FileResponseStreamingResponse),您可能会在客户端(或在反向代理端,如果您使用的是反向代理端)遇到Timeout错误,因为您将无法响应客户端,直到您阅读整个响应正文(当您循环遍历时response.body_iterator)。您提到“请求和响应 JSON 的主体大小约为 1MB”;因此,这通常应该没问题(但是,事先考虑一些事项始终是一个好习惯,例如您的 API 预计将同时服务多少个请求、哪些其他应用程序可能正在使用 RAM 等,以便确定这是否是一个问题)。如果需要,您可以使用SlowAPI (如本答案所示)来限制对 API 端点的请求数量。

\n

middleware仅限制特定路由的使用

\n

您可以通过以下方式限制对特定端点的使用middleware

\n
    \n
  • request.url.path根据您想要记录\nrequest和 的预定义路由列表检查中间件内部response,如本答案中所述(请参阅\n“更新”部分),
  • \n
  • 或使用子应用程序,如本答案所示\n
  • \n
  • 或使用自定义类,如下面的选项 2APIRoute所示。
  • \n
\n

工作示例

\n
from fastapi import FastAPI, APIRouter, Response, Request\nfrom starlette.background import BackgroundTask\nfrom fastapi.routing import APIRoute\nfrom starlette.types import Message\nfrom typing import Dict, Any\nimport logging\n\n\napp = FastAPI()\nlogging.basicConfig(filename=\'info.log\', level=logging.DEBUG)\n\n\ndef log_info(req_body, res_body):\n    logging.info(req_body)\n    logging.info(res_body)\n\n\n# not needed when using FastAPI>=0.108.0.\nasync def set_body(request: Request, body: bytes):\n    async def receive() -> Message:\n        return {\'type\': \'http.request\', \'body\': body}\n    request._receive = receive\n\n\n@app.middleware(\'http\')\nasync def some_middleware(request: Request, call_next):\n    req_body = await request.body()\n    await set_body(request, req_body)  # not needed when using FastAPI>=0.108.0.\n    response = await call_next(request)\n    \n    res_body = b\'\'\n    async for chunk in response.body_iterator:\n        res_body += chunk\n    \n    task = BackgroundTask(log_info, req_body, res_body)\n    return Response(content=res_body, status_code=response.status_code, \n        headers=dict(response.headers), media_type=response.media_type, background=task)\n\n\n@app.post(\'/\')\ndef main(payload: Dict[Any, Any]):\n    return payload\n
Run Code Online (Sandbox Code Playgroud)\n

如果您想对请求正文执行一些验证\xe2\x80\x94例如,确保请求正文大小不超过某个值\xe2\x80\x94而不是使用request.body(),您可以在以下位置处理正文一个块使用方法一次.stream(),如下所示(类似于这个答案)。

\n
@app.middleware(\'http\')\nasync def some_middleware(request: Request, call_next):    \n    req_body = b\'\'\n    async for chunk in request.stream():\n        req_body += chunk\n    ...\n
Run Code Online (Sandbox Code Playgroud)\n

选项 2 - 使用自定义APIRoute

\n

您也可以使用类似于此处此处\xe2\x80\x94 的自定义 APIRoute\xe2\x80\x94,其中除其他外,它允许您在应用程序处理主体之前对其进行操作,以及在其之前的主体返回给客户端。此选项还允许您将此类的使用限制为您希望的路由,因为只有 下的端点(即在下面的示例中)才会使用自定义类。requestresponseAPIRouterrouterAPIRoute

\n

应该注意的是,上面选项 1中“注释”部分提到的相同注释也适用于该选项。例如,如果您的 API 返回StreamingResponse\xe2\x80\x94,如下/video例所示,该路径正在从在线源流式传输视频文件(可以在此处找到用于测试此功能的公共视频,您甚至可以使用更长的视频比下面使用的视频更清楚地看到效果)\xe2\x80\x94如果你的服务器的 RAM 无法处理,你可能会在服务器端遇到问题,以及客户端的延迟(和反向代理服务器(如果使用反向代理服务器),因为整个(流式)响应在返回给客户端之前被读取并存储在 RAM 中(如前所述)。StreamingResponse在这种情况下,您可以排除从自定义类返回 a 的端点APIRoute,并将其使用限制为所需的路由\xe2\x80\x94,特别是,如果它是一个大视频文件,甚至是不太可能的实时视频将其存储在日志\xe2\x80\x94中很有意义,只需不使用此类端点的@<name_of_router>装饰器(即, @router在下面的示例中),而是使用@<name_of_app>装饰器(即,@app在下面的示例中)或其他一些APIRouter子应用程序

\n

工作示例

\n
from fastapi import FastAPI, APIRouter, Response, Request\nfrom starlette.background import BackgroundTask\nfrom starlette.responses import StreamingResponse\nfrom fastapi.routing import APIRoute\nfrom starlette.types import Message\nfrom typing import Callable, Dict, Any\nimport logging\nimport httpx\n\n\ndef log_info(req_body, res_body):\n    logging.info(req_body)\n    logging.info(res_body)\n\n       \nclass LoggingRoute(APIRoute):\n    def get_route_handler(self) -> Callable:\n        original_route_handler = super().get_route_handler()\n\n        async def custom_route_handler(request: Request) -> Response:\n            req_body = await request.body()\n            response = await original_route_handler(request)\n            tasks = response.background\n            \n            if isinstance(response, StreamingResponse):\n                res_body = b\'\'\n                async for item in response.body_iterator:\n                    res_body += item\n                  \n                task = BackgroundTask(log_info, req_body, res_body)\n                response = Response(content=res_body, status_code=response.status_code, \n                        headers=dict(response.headers), media_type=response.media_type)\n            else:\n                task = BackgroundTask(log_info, req_body, response.body)\n            \n            # check if the original response had background tasks already assigned to it\n            if tasks:\n                tasks.add_task(task)  # add the new task to the tasks list\n                response.background = tasks\n            else:\n                response.background = task\n                \n            return response\n            \n        return custom_route_handler\n\n\napp = FastAPI()\nrouter = APIRouter(route_class=LoggingRoute)\nlogging.basicConfig(filename=\'info.log\', level=logging.DEBUG)\n\n\n@router.post(\'/\')\ndef main(payload: Dict[Any, Any]):\n    return payload\n\n\n@router.get(\'/video\')\ndef get_video():\n    url = \'https://storage.googleapis.com/gtv-videos-bucket/sample/ForBiggerBlazes.mp4\'\n    \n    def gen():\n        with httpx.stream(\'GET\', url) as r:\n            for chunk in r.iter_raw():\n                yield chunk\n\n    return StreamingResponse(gen(), media_type=\'video/mp4\')\n\n\napp.include_router(router)\n
Run Code Online (Sandbox Code Playgroud)\n

  • @576i 我快速浏览了一下,代码按预期工作,即使引发“HTTPException”也是如此。 (2认同)

laz*_*ead 3

您可以尝试像FastAPI官方文档中那样自定义APIRouter

import time
from typing import Callable

from fastapi import APIRouter, FastAPI, Request, Response
from fastapi.routing import APIRoute


class TimedRoute(APIRoute):
    def get_route_handler(self) -> Callable:
        original_route_handler = super().get_route_handler()

        async def custom_route_handler(request: Request) -> Response:
            before = time.time()
            response: Response = await original_route_handler(request)
            duration = time.time() - before
            response.headers["X-Response-Time"] = str(duration)
            print(f"route duration: {duration}")
            print(f"route response: {response}")
            print(f"route response headers: {response.headers}")
            return response

        return custom_route_handler


app = FastAPI()
router = APIRouter(route_class=TimedRoute)


@app.get("/")
async def not_timed():
    return {"message": "Not timed"}


@router.get("/timed")
async def timed():
    return {"message": "It's the time of my life"}


app.include_router(router)
Run Code Online (Sandbox Code Playgroud)


归档时间:

查看次数:

30019 次

最近记录:

1 年,6 月 前