Mar*_*iño 25 python logging audit-logging python-logging fastapi
我们正在使用 Python FastAPI 编写一个 Web 服务,该服务将托管在 Kubernetes 中。出于审计目的,我们需要保存特定路由的request
/的原始 JSON 正文。JSON的主体大小约为1MB,最好这不应该影响响应时间。我们怎样才能做到这一点?response
request
response
Chr*_*ris 41
你可以使用一个Middleware
. Amiddleware
接受传入应用程序的每个请求,因此允许您request
在任何特定端点处理该请求之前处理 , 以及response
在将其返回给客户端之前处理 , 。要创建,您可以在函数之上middleware
使用装饰器,如下所示。@app.middleware("http")
由于您需要使用或 来使用middleware
\xe2\x80\x94内的流中的请求正文,如本答案所示(在幕后,前一个方法实际上调用后者,请参见此处)\xe2\x80\x94然后它当您稍后将其传递到相应的端点时,该信息将不可用。因此,您可以按照本文中描述的方法使请求正文可用(即使用下面的函数)。更新:此问题现已修复,因此,如果您使用 FastAPI 0.108.0 或更高版本,则无需使用该解决方法。request.body()
request.stream()
request
set_body()
至于正文,您可以使用与此答案response
中描述的相同方法来消耗正文,然后将其返回给客户端。上述链接答案中描述的任一选项都可以使用;然而,下面的代码使用选项 2,它将主体存储在 bytes 对象中并直接返回自定义值(以及原始 的,和)。response
Response
status_code
headers
media_type
response
要记录数据,您可以使用 a BackgroundTask
,如此答案、此答案和此答案中所述。ABackgroundTask
仅在发送响应后运行(另请参阅Starlette 文档);因此,客户端在接收之前不必等待日志记录完成response
(因此,响应时间不会受到明显影响)。
如果您的流媒体request
或response
主体不适合服务器的 RAM(例如,想象运行 8GB RAM 的计算机上有 100GB 的主体),那么当您存储数据时,就会出现问题RAM 没有足够的空间来容纳累积的数据。另外,如果数据很大response
(例如,很大FileResponse
或StreamingResponse
),您可能会在客户端(或在反向代理端,如果您使用的是反向代理端)遇到Timeout
错误,因为您将无法响应客户端,直到您阅读整个响应正文(当您循环遍历时response.body_iterator
)。您提到“请求和响应 JSON 的主体大小约为 1MB”;因此,这通常应该没问题(但是,事先考虑一些事项始终是一个好习惯,例如您的 API 预计将同时服务多少个请求、哪些其他应用程序可能正在使用 RAM 等,以便确定这是否是一个问题)。如果需要,您可以使用SlowAPI (如本答案所示)来限制对 API 端点的请求数量。
middleware
仅限制特定路由的使用您可以通过以下方式限制对特定端点的使用middleware
:
request.url.path
根据您想要记录\nrequest
和 的预定义路由列表检查中间件内部response
,如本答案中所述(请参阅\n“更新”部分),APIRoute
所示。from fastapi import FastAPI, APIRouter, Response, Request\nfrom starlette.background import BackgroundTask\nfrom fastapi.routing import APIRoute\nfrom starlette.types import Message\nfrom typing import Dict, Any\nimport logging\n\n\napp = FastAPI()\nlogging.basicConfig(filename=\'info.log\', level=logging.DEBUG)\n\n\ndef log_info(req_body, res_body):\n logging.info(req_body)\n logging.info(res_body)\n\n\n# not needed when using FastAPI>=0.108.0.\nasync def set_body(request: Request, body: bytes):\n async def receive() -> Message:\n return {\'type\': \'http.request\', \'body\': body}\n request._receive = receive\n\n\n@app.middleware(\'http\')\nasync def some_middleware(request: Request, call_next):\n req_body = await request.body()\n await set_body(request, req_body) # not needed when using FastAPI>=0.108.0.\n response = await call_next(request)\n \n res_body = b\'\'\n async for chunk in response.body_iterator:\n res_body += chunk\n \n task = BackgroundTask(log_info, req_body, res_body)\n return Response(content=res_body, status_code=response.status_code, \n headers=dict(response.headers), media_type=response.media_type, background=task)\n\n\n@app.post(\'/\')\ndef main(payload: Dict[Any, Any]):\n return payload\n
Run Code Online (Sandbox Code Playgroud)\n如果您想对请求正文执行一些验证\xe2\x80\x94例如,确保请求正文大小不超过某个值\xe2\x80\x94而不是使用request.body()
,您可以在以下位置处理正文一个块使用方法一次.stream()
,如下所示(类似于这个答案)。
@app.middleware(\'http\')\nasync def some_middleware(request: Request, call_next): \n req_body = b\'\'\n async for chunk in request.stream():\n req_body += chunk\n ...\n
Run Code Online (Sandbox Code Playgroud)\nAPIRoute
类您也可以使用类似于此处和此处\xe2\x80\x94 的自定义 APIRoute
类\xe2\x80\x94,其中除其他外,它允许您在应用程序处理主体之前对其进行操作,以及在其之前的主体返回给客户端。此选项还允许您将此类的使用限制为您希望的路由,因为只有 下的端点(即在下面的示例中)才会使用自定义类。request
response
APIRouter
router
APIRoute
应该注意的是,上面选项 1中“注释”部分提到的相同注释也适用于该选项。例如,如果您的 API 返回StreamingResponse
\xe2\x80\x94,如下/video
例所示,该路径正在从在线源流式传输视频文件(可以在此处找到用于测试此功能的公共视频,您甚至可以使用更长的视频比下面使用的视频更清楚地看到效果)\xe2\x80\x94如果你的服务器的 RAM 无法处理,你可能会在服务器端遇到问题,以及客户端的延迟(和反向代理服务器(如果使用反向代理服务器),因为整个(流式)响应在返回给客户端之前被读取并存储在 RAM 中(如前所述)。StreamingResponse
在这种情况下,您可以排除从自定义类返回 a 的端点APIRoute
,并将其使用限制为所需的路由\xe2\x80\x94,特别是,如果它是一个大视频文件,甚至是不太可能的实时视频将其存储在日志\xe2\x80\x94中很有意义,只需不使用此类端点的@<name_of_router>
装饰器(即, @router
在下面的示例中),而是使用@<name_of_app>
装饰器(即,@app
在下面的示例中)或其他一些APIRouter
或子应用程序。
from fastapi import FastAPI, APIRouter, Response, Request\nfrom starlette.background import BackgroundTask\nfrom starlette.responses import StreamingResponse\nfrom fastapi.routing import APIRoute\nfrom starlette.types import Message\nfrom typing import Callable, Dict, Any\nimport logging\nimport httpx\n\n\ndef log_info(req_body, res_body):\n logging.info(req_body)\n logging.info(res_body)\n\n \nclass LoggingRoute(APIRoute):\n def get_route_handler(self) -> Callable:\n original_route_handler = super().get_route_handler()\n\n async def custom_route_handler(request: Request) -> Response:\n req_body = await request.body()\n response = await original_route_handler(request)\n tasks = response.background\n \n if isinstance(response, StreamingResponse):\n res_body = b\'\'\n async for item in response.body_iterator:\n res_body += item\n \n task = BackgroundTask(log_info, req_body, res_body)\n response = Response(content=res_body, status_code=response.status_code, \n headers=dict(response.headers), media_type=response.media_type)\n else:\n task = BackgroundTask(log_info, req_body, response.body)\n \n # check if the original response had background tasks already assigned to it\n if tasks:\n tasks.add_task(task) # add the new task to the tasks list\n response.background = tasks\n else:\n response.background = task\n \n return response\n \n return custom_route_handler\n\n\napp = FastAPI()\nrouter = APIRouter(route_class=LoggingRoute)\nlogging.basicConfig(filename=\'info.log\', level=logging.DEBUG)\n\n\n@router.post(\'/\')\ndef main(payload: Dict[Any, Any]):\n return payload\n\n\n@router.get(\'/video\')\ndef get_video():\n url = \'https://storage.googleapis.com/gtv-videos-bucket/sample/ForBiggerBlazes.mp4\'\n \n def gen():\n with httpx.stream(\'GET\', url) as r:\n for chunk in r.iter_raw():\n yield chunk\n\n return StreamingResponse(gen(), media_type=\'video/mp4\')\n\n\napp.include_router(router)\n
Run Code Online (Sandbox Code Playgroud)\n
您可以尝试像FastAPI官方文档中那样自定义APIRouter:
import time
from typing import Callable
from fastapi import APIRouter, FastAPI, Request, Response
from fastapi.routing import APIRoute
class TimedRoute(APIRoute):
def get_route_handler(self) -> Callable:
original_route_handler = super().get_route_handler()
async def custom_route_handler(request: Request) -> Response:
before = time.time()
response: Response = await original_route_handler(request)
duration = time.time() - before
response.headers["X-Response-Time"] = str(duration)
print(f"route duration: {duration}")
print(f"route response: {response}")
print(f"route response headers: {response.headers}")
return response
return custom_route_handler
app = FastAPI()
router = APIRouter(route_class=TimedRoute)
@app.get("/")
async def not_timed():
return {"message": "Not timed"}
@router.get("/timed")
async def timed():
return {"message": "It's the time of my life"}
app.include_router(router)
Run Code Online (Sandbox Code Playgroud)
归档时间: |
|
查看次数: |
30019 次 |
最近记录: |