FastAPI返回大量JSON数据非常慢

cod*_*ird 5 python json dataframe pandas fastapi

我有一个 FastAPIGET端点,它返回大量 JSON 数据(约 160,000 行和 45 列)。毫不奇怪,使用返回数据非常json.dumps()慢。我首先使用文件中的数据读取数据json.loads(),并根据输入的参数对其进行过滤。有没有比使用更快的方法将数据返回给用户return data?以目前的状态,需要将近一分钟的时间。

我的代码目前如下所示:

# helper function to parse parquet file (where data is stored)
def parse_parquet(file_path):
    df = pd.read_parquet(file_path)
    result = df.to_json(orient = 'records')
    parsed = json.loads(result)
    return parsed
    

@app.get('/endpoint')
# has several more parameters
async def some_function(year = int | None = None, id = str | None = None):
    if year is None:
        data = parse_parquet(f'path/{year}_data.parquet')
    # no year
    if year is not None:
        data = parse_parquet(f'path/all_data.parquet')
    if id is not None:
        data = [d for d in data if d['id'] == id]
    return data
Run Code Online (Sandbox Code Playgroud)

Chr*_*ris 10

响应缓慢的原因之一是,在您的parse_parquet()方法中,您最初将文件转换为 JSON(使用df.to_json()),然后转换为字典(使用json.loads()),最后再次转换为 JSON,因为 FastAPI 在幕后自动转换返回的使用 ,将值转换为 JSON 兼容的数据jsonable_encoder,然后使用 Python 标准json.dumps()序列化 object\xe2\x80\x94a 过程,该过程非常慢(有关更多详细信息,请参阅此答案)。

\n

正如 @MatsLindh 在评论部分中所建议的,您可以使用替代 JSON 编码器,例如orjsonujosn(另请参阅此答案jsonable_encoder),与让 FastAPI 使用然后标准json.dumps()进行转换相比,这确实会加快该过程将数据转换为 JSON。然而,使用 pandasto_json()并直接返回自定义Response\xe2\x80\x94(如本答案选项 1更新 2 ) \xe2\x80\x94 中所述)似乎是性能最佳的解决方案。您可以使用下面给出的代码\xe2\x80\x94(它使用自定义\xe2\x80\x94)来比较所有可用解决方案的响应时间。APIRoute

\n

使用您自己的 parquet 文件或以下代码创建包含 160K 行和 45 列的示例 parquet 文件。

\n

创建镶木地板.py

\n
import pandas as pd\nimport numpy as np\n\ncolumns = [\'C\' + str(i) for i in range(1, 46)]\ndf = pd.DataFrame(data=np.random.randint(99999, 99999999, size=(160000,45)),columns=columns)\ndf.to_parquet(\'data.parquet\')\n
Run Code Online (Sandbox Code Playgroud)\n

运行下面的 FastAPI 应用程序并分别访问每个端点,以检查完成加载数据并将数据转换为 JSON 的过程所需的时间。

\n

应用程序.py

\n
from fastapi import FastAPI, APIRouter, Response, Request\nfrom fastapi.routing import APIRoute\nfrom typing import Callable\nimport pandas as pd\nimport json\nimport time\nimport ujson\nimport orjson\n\n\nclass TimedRoute(APIRoute):\n    def get_route_handler(self) -> Callable:\n        original_route_handler = super().get_route_handler()\n\n        async def custom_route_handler(request: Request) -> Response:\n            before = time.time()\n            response: Response = await original_route_handler(request)\n            duration = time.time() - before\n            response.headers["Response-Time"] = str(duration)\n            print(f"route duration: {duration}")\n            return response\n\n        return custom_route_handler\n\napp = FastAPI()\nrouter = APIRouter(route_class=TimedRoute)\n\n@router.get("/defaultFastAPIencoder")\ndef get_data_default():\n    df = pd.read_parquet(\'data.parquet\')   \n    return df.to_dict(orient="records")\n    \n@router.get("/orjson")\ndef get_data_orjson():\n    df = pd.read_parquet(\'data.parquet\')\n    return Response(orjson.dumps(df.to_dict(orient=\'records\')), media_type="application/json")\n\n@router.get("/ujson")\ndef get_data_ujson():\n    df = pd.read_parquet(\'data.parquet\')   \n    return Response(ujson.dumps(df.to_dict(orient=\'records\')), media_type="application/json")\n\n# Preferred way  \n@router.get("/pandasJSON")\ndef get_data_pandasJSON():\n    df = pd.read_parquet(\'data.parquet\')   \n    return Response(df.to_json(orient="records"), media_type="application/json")  \n\napp.include_router(router)\n
Run Code Online (Sandbox Code Playgroud)\n

尽管使用/pandasJSON上面的方法响应时间相当快(这应该是首选方式),但在浏览器上显示数据时可能会遇到一些延迟。然而,这与服务器端无关,而是与客户端有关,因为浏览器正在尝试显示大量数据。如果您不想显示数据,而是让用户将数据下载到他们的设备上(这会更快),您可以使用参数设置标Content-DispositionResponseattachment传递 a filename,指示浏览器应下载该文件。有关更多详细信息,请查看此答案此答案

\n
@router.get("/download")\ndef get_data():\n    df = pd.read_parquet(\'data.parquet\')\n    headers = {\'Content-Disposition\': \'attachment; filename="data.json"\'}\n    return Response(df.to_json(orient="records"), headers=headers, media_type=\'application/json\')\n
Run Code Online (Sandbox Code Playgroud)\n

我还应该提到,有一个名为 的库,Dask它可以处理大型数据集,如此处所述以防您必须处理需要很长时间才能完成的大量记录。与Pandas类似,你可以使用该.read_parquet()方法来读取文件。由于 Dask 似乎没有提供等效的.to_json()方法,您可以使用 将 Dask DataFrame 转换为 Pandas DataFrame df.compute(),然后使用 Pandasdf.to_json()将 DataFrame 转换为 JSON 字符串,并返回它,如上所示。

\n

我还建议您看一下这个答案,它提供了有关流式传输/返回 DataFrame 的详细信息和解决方案,以防您正在处理大量数据,将它们转换为 JSON(使用.to_json())或 CSV(使用.to_csv())可能如果您选择将输出字符串(JSON 或 CSV)存储到 RAM 中(如果您不将路径参数传递给上述函数,这是默认行为),则会导致服务器端出现内存问题\xe2\x80\ x94,因为也已经为原始 DataFrame 分配了大量内存。

\n