FastAPI 以串行方式而不是并行方式运行 api 调用

Lea*_*ers 50 python asynchronous concurrent-processing python-asyncio fastapi

我有以下代码:

import time
from fastapi import FastAPI, Request
    
app = FastAPI()
    
@app.get("/ping")
async def ping(request: Request):
        print("Hello")
        time.sleep(5)
        print("bye")
        return {"ping": "pong!"}
Run Code Online (Sandbox Code Playgroud)

如果我在本地主机上运行我的代码 - 例如http://localhost:8501/ping- 在同一浏览器窗口的不同选项卡中,我得到:

Hello
bye
Hello
bye
Run Code Online (Sandbox Code Playgroud)

代替:

Hello
Hello
bye
bye
Run Code Online (Sandbox Code Playgroud)

我已经阅读过有关使用的内容httpx,但仍然无法实现真正​​的并行化。有什么问题?

Chr*_*ris 128

根据FastAPI 的文档

\n
\n

def当您使用正常而不是\nof声明路径操作函数时,它会在然后\n edasync def的外部线程池中运行,而不是直接调用(因为它会阻塞\n服务器)。await

\n
\n

另外,如下所述

\n
\n

如果您使用的第三方库与某些内容(数据库、API、文件系统等)进行通信,并且不支持使用await,(目前大多数数据库库都是这种情况) ,然后将路径操作函数声明为\n正常情况下,只需def.

\n

如果您的应用程序(以某种方式)不需要与其他任何东西通信并等待它响应,请使用async def.

\n

如果您只是不知道,请使用普通def.

\n

注意:您可以根据需要在路径操作函数中混合使用def和,并使用最适合您的\n选项定义每个函数。async defFastAPI 会用它们做正确的事情。

\n

无论如何,在上述任何情况下,FastAPI仍然会同步工作并且速度非常快。

\n

但按照上述步骤,它将能够进行一些\n性能优化。

\n
\n

因此,defFastAPI 中的端点(在异步编程的上下文中,用 just 定义的函数称为def同步函数)在与外部线程池分开的线程中运行,await然后再进行编辑,因此,FastAPI 仍将异步工作。换句话说,服务器将同时处理对这些端点的请求。而async def端点直接event loop\xe2\x80\x94中运行,而\xe2\x80\x94在主(单)线程中运行\xe2\x80\x94,也就是说,服务器也将并发/异步处理对此类端点的请求,只要有一个await调用此类端点/路由内的非阻塞 I/O 绑定操作async def,例如等待(1) 通过网络发送来自客户端的数据,(2) 读取磁盘中文件的内容,(3)要完成的数据库操作等(请看这里)。但是,如果定义的端点async defawait用于内部的某些协程,则为了让出时间让其他任务运行event loop(例如,对相同或其他端点的请求、后台任务等),每个请求都会向此类在将控制权返回给event loop并允许其他任务运行之前,端点必须完全完成(即退出端点) 。也就是说,在这种情况下,服务器会按顺序处理请求。请注意,相同的概念也适用于使用正常定义的函数,这些函数def用作StreamingResponse\ 的生成器(请参阅StreamingResponse类实现)或Background Tasks(请参阅BackgroundTask类实现),这意味着 FastAPI 在幕后也将在单独的线程中运行此类函数相同的外部线程池。该外部线程池的默认工作线程数是40并且可以根据需要进行调整\xe2\x80\x94请查看此答案以了解如何执行此操作。因此,读完这个答案后,您应该能够决定是否应该使用 或 定义 FastAPI 端点、生成StreamingResponse器或后台任务函数。defasync def

\n

关键字await(仅在async def函数内有效)将函数控制传递回event loop. 换句话说,它暂停周围协程的执行(即协程对象是调用async def函数的结果),并告诉 edevent loop让其他任务运行,直到该awaited 任务完成。请注意async def,仅仅因为您可以使用然后await在端点内定义自定义函数async def,并不意味着您的代码将异步工作,例如,如果该自定义函数包含对time.sleep()、CPU 绑定任务、非异步 I/O 库,或与异步 Python 代码不兼容的任何其他阻塞调用。以FastAPI为例,当使用、 等async方法时,FastAPI/Starlette在幕后实际上是从前面介绍的外部线程池中的一个单独的线程中调用相应的同步File方法(使用函数s它;否则,此类方法/操作将阻止\xe2\x80\x94,您可以通过查看该类的实现了解更多信息。UploadFileawait file.read()await file.write() async run_in_threadpool()awaitevent loopUploadFile

\n

注意async不是指并行,而是并发。如前所述,使用async和的异步代码await很多时候被概括为使用协程。协程是协作的(或协作多任务),这意味着“在任何给定时间,带有协程的程序运行其中一个协程,并且该正在运行的协程仅在明确请求暂停时才暂停其执行”(请参阅​​此处此处有关协程的更多信息)。正如本文所述:

\n
\n

具体来说,每当当前运行的协程的执行到达\nawait表达式时,该协程可能会被挂起,而其他先前挂起的协程可能会恢复执行(如果它被挂起的内容已返回一个值)。async for当块从同步迭代器请求下一个值或async with进入或退出块时(因为这些操作await在幕后使用),也可能会发生挂起。

\n
\n

但是,如果在函数/端点内直接执行/调用阻塞 I/O 密集型或 CPU 密集型操作async def,则会阻塞事件循环,从而阻塞主线程(event loop在某个函数的主线程中运行)流程/工人)。因此,诸如端点time.sleep()中的阻塞操作async def会阻塞整个服务器(如问题中提供的代码示例所示)。因此,如果您的端点不打算进行任何async调用,您可以使用普通来声明它def,在这种情况下,FastAPI 会在与外部线程池不同的单独线程中运行它await,如前所述(更多解决方案在以下部分)。例子:

\n
@app.get("/ping")\ndef ping(request: Request):\n    #print(request.client)\n    print("Hello")\n    time.sleep(5)\n    print("bye")\n    return "pong"\n
Run Code Online (Sandbox Code Playgroud)\n

否则,如果您必须在端点内执行的函数是async您必须执行的函数await,则您应该使用 定义您的端点async def。为了演示这一点,下面的示例使用该asyncio.sleep()函数(来自asyncio库),它提供非阻塞睡眠操作。该await asyncio.sleep()方法将暂停周围协程的执行(直到睡眠操作完成),从而允许其中的其他任务event loop运行。这里这里也给出了类似的例子。

\n
import asyncio\n \n@app.get("/ping")\nasync def ping(request: Request):\n    #print(request.client)\n    print("Hello")\n    await asyncio.sleep(5)\n    print("bye")\n    return "pong"\n
Run Code Online (Sandbox Code Playgroud)\n

如果两个请求同时到达(大约)\xe2\x80\x94,则上面的两个端点将以与问题中提到的相同顺序将指定的消息打印到屏幕\xe2\x80\x94,即:

\n
Hello\nHello\nbye\nbye\n
Run Code Online (Sandbox Code Playgroud)\n

重要的提示

\n

当使用浏览器第二次(第三次等)调用同一端点时,请记住从与浏览器主会话隔离的选项卡中执行此操作;否则,后续请求(即第一个请求之后的请求)可能会被浏览器(在客户端)阻止,因为浏览器在发送下一个请求之前可能正在等待服务器对上一个请求的响应。至少对于 Chrome 浏览器来说,这是一种常见行为,因为在再次请求相同资源之前,需要等待查看请求结果并检查结果是否可以缓存。

\n

您可以通过print(request.client)在端点内部使用来确认这一点,您将在其中看到所有传入请求的hostnameport编号相同\xe2\x80\x94,以防请求是从同一浏览器窗口/会话中打开的选项卡发起的;否则,port每个请求的数字通常会不同\xe2\x80\x94,因此,这些请求将由服务器顺序处理,因为浏览器/客户端首先顺序发送它们。为了克服这个问题,您可以:

\n
    \n
  1. 重新加载同一选项卡(与正在运行的选项卡相同),或者

    \n
  2. \n
  3. 在隐身窗口中打开新选项卡,或者

    \n
  4. \n
  5. 使用不同的浏览器/客户端发送请求,或者

    \n
  6. \n
  7. 使用该httpx发出异步 HTTP 请求,以及可等待 ,它允许同时执行多个异步操作,然后按照可等待(任务)传递给该函数的顺序asyncio.gather()返回结果列表(请查看答案更多细节)。

    \n

    例子

    \n
    Hello\nHello\nbye\nbye\n
    Run Code Online (Sandbox Code Playgroud)\n

    如果您必须调用可能需要不同时间来处理请求的不同端点,并且您希望在从服务器\xe2\x80\x94返回后立即在客户端打印响应,而不是等待asyncio.gather()收集所有任务的结果,并按照任务传递给send()函数\xe2\x80\x94的相同顺序打印出来,您可以将send()上面示例的函数替换为下面所示的函数:

    \n
    async def send(url, client):\n    res = await client.get(url, timeout=10)\n    print(res.json())\n    return res\n
    Run Code Online (Sandbox Code Playgroud)\n
  8. \n
\n

Async/await和阻止 I/O 密集型或 CPU 密集型操作

\n

如果您需要定义一个端点async def(就像您可能需要await为其中的某些协程定义端点一样),但也有一些同步阻塞 I/O 密集型或 CPU 密集型操作(计算密集型任务),这些操作会阻塞event loop(本质上,整个服务器)并且不会让其他请求通过,例如:

\n
import httpx\nimport asyncio\n\nURLS = [\'http://127.0.0.1:8000/ping\'] * 2\n\nasync def send(url, client):\n    return await client.get(url, timeout=10)\n\nasync def main():\n    async with httpx.AsyncClient() as client:\n        tasks = [send(url, client) for url in URLS]\n        responses = await asyncio.gather(*tasks)\n        print(*[r.json() for r in responses], sep=\'\\n\')\n\nasyncio.run(main())\n
Run Code Online (Sandbox Code Playgroud)\n

然后:

\n
    \n
  1. 您应该检查是否可以将端点的定义更改为正常def而不是async def. 例如,如果端点中必须等待的唯一方法是读取文件内容的方法(正如您在下面的注释部分中提到的),您可以将端点参数的类型声明为(即bytesfile: bytes = File()),因此,FastAPI 会为您读取该文件,您将收到bytes. 因此,没有必要使用await file.read(). 请注意,上述方法应该适用于小文件,因为整个文件内容将存储到内存中(请参阅有关参数的文档File);因此,如果您的系统没有足够的 RAM 来容纳累积的数据(例如,如果您有 8GB RAM,您可以\xe2\x80\x99t 加载 50GB 文件),您的应用程序最终可能会崩溃。或者,您可以直接调用.read()方法SpooledTemporaryFile(可以通过对象.file的属性访问UploadFile),这样您就不必使用await方法.read()\xe2\x80\x94,并且现在可以使用正常声明端点def,每个请求将在单独的线程中运行(示例如下)。有关如何上传File以及 Starlette/FastAPI 如何SpooledTemporaryFile在幕后使用的更多详细信息,请查看此答案此答案

    \n
    async def send(url, client):\n    res = await client.get(url, timeout=10)\n    print(res.json())\n    return res\n
    Run Code Online (Sandbox Code Playgroud)\n
  2. \n
  3. 使用模块 \xe2\x80\x94中的FastAPI\'s (Starlette\'s)run_in_threadpool()函数,正如 @tiangolo这里建议的\xe2\x80\x94 ,“将在单独的线程中运行该函数,以确保主线程(其中协程是run)不会被阻止”(参见此处)。正如 @tiangolo这里所描述的,“是一个可以的函数;第一个参数是一个普通函数,以下参数直接传递给该函数。它支持序列参数和关键字参数”。concurrencyrun_in_threadpoolawait

    \n
    @app.post("/ping")\nasync def ping(file: UploadFile = File(...)):\n    print("Hello")\n    try:\n        contents = await file.read()\n        res = cpu_bound_task(contents)  # this will block the event loop\n    finally:\n        await file.close()\n    print("bye")\n    return "pong"\n
    Run Code Online (Sandbox Code Playgroud)\n
  4. \n
  5. 或者,在使用\ xe2\x80\x94 获得运行后,使用asyncio\'s \xe2\x80\x94 来运行任务,在这种情况下,您可以让它完成并返回结果,然后再继续下一行代码。传递给executor参数,将使用默认的执行器;这是一个:loop.run_in_executor()event loopasyncio.get_running_loop()awaitNoneThreadPoolExecutor

    \n
    @app.post("/ping")\ndef ping(file: UploadFile = File(...)):\n    print("Hello")\n    try:\n        contents = file.file.read()\n        res = cpu_bound_task(contents)\n    finally:\n        file.file.close()\n    print("bye")\n    return "pong"\n
    Run Code Online (Sandbox Code Playgroud)\n

    或者,如果您想传递关键字参数,则可以使用lambda表达式(例如lambda: cpu_bound_task(some_arg=contents)),或者最好使用functools.partial(),文档中特别建议使用loop.run_in_executor()

    \n
    from fastapi.concurrency import run_in_threadpool\n\nres = await run_in_threadpool(cpu_bound_task, contents)\n
    Run Code Online (Sandbox Code Playgroud)\n

    在 Python 3.9+ 中,您还可以asyncio.to_thread()在单​​独的线程\xe2\x80\x94 中异步运行同步函数,本质上,它在幕后使用await loop.run_in_executor(None, func_call)如. 该函数采用要执行的阻塞函数的名称以及该函数的任何参数(和/或),然后返回一个可以编辑的协程。例子:asyncio.to_thread()to_thread()*args**kwargsawait

    \n
    import asyncio\n\nres = await asyncio.to_thread(cpu_bound_task, contents)\n
    Run Code Online (Sandbox Code Playgroud)\n

    请注意,正如此答案中所解释的,传递Noneexecutor参数不会ThreadPoolExecutor在每次调用时创建新的await loop.run_in_executor(None, ...),而是重新使用具有默认数量的工作线程(即)的默认执行器。因此,根据您的应用程序的要求,该数字可能会相当低。在这种情况下,您应该使用自定义的. 例如:min(32, os.cpu_count() + 4)ThreadPoolExecutor

    \n
    import asyncio\n\nloop = asyncio.get_running_loop()\nres = await loop.run_in_executor(None, cpu_bound_task, contents)\n
    Run Code Online (Sandbox Code Playgroud)\n

    我强烈建议您查看上面的链接答案,以了解 usingrun_in_threadpool()和之间的区别run_in_executor(),以及如何ThreadPoolExecutor在应用程序启动时创建可重用的自定义,并根据需要调整最大工作线程数。

    \n
  6. \n
  7. ThreadPoolExecutor将成功防止event loop被阻止,但不会给您带来并行运行代码所期望的性能改进;尤其是当需要执行任务时,例如此处描述的任务(例如,音频或图像处理、机器学习等)。因此,最好使用 ,在单独的进程\xe2\x80\x94中运行 CPU 密集型任务,如下所示\xe2\x80\x94,您可以再次与 集成,以便它完成工作并返回结果(s)。如此处所述,保护程序的入口点以避免递归生成子进程等非常重要。基本上,您的代码必须位于.CPU-boundProcessPoolExecutorasyncioawaitif __name__ == \'__main__\'

    \n
    import asyncio\nfrom functools import partial\n\nloop = asyncio.get_running_loop()\nres = await loop.run_in_executor(None, partial(cpu_bound_task, some_arg=contents))\n
    Run Code Online (Sandbox Code Playgroud)\n

    再次,我建议您先查看一下链接的答案,了解如何ProcessPoolExecutor在应用程序启动时创建可重用的内容。您可能会发现这个答案也很有帮助。

    \n
  8. \n
  9. 使用更多的工作线程来利用多核 CPU,以便并行运行多个进程并能够服务更多的请求。例如,uvicorn main:app --workers 4(如果您使用Gunicorn 作为 Uvicorn 工作人员的流程管理器,请查看此答案)。当使用 1 个工作线程时,仅运行一个进程。当使用多个工作线程时,这将产生多个进程(都是单线程的)。每个进程都有一个单独的全局解释器锁(GIL),以及自己的event loop,它运行在每个进程的主线程中,并执行其线程中的所有任务。这意味着,只有一个线程可以锁定每个进程的解释器;当然,除非您在 外部或内部使用额外的线程event loop,例如,当使用ThreadPoolExecutorwith时loop.run_in_executor,或使用普通而不是定义端点/后台任务/ StreamingResponse\ 的生成器时,以及调用\ 的方法时(请参阅有关更多详细信息,请参阅此答案的前两段)。defasync defUploadFile

    \n

    注意:每个worker “都有自己的东西、变量和内存”。这意味着global变量/对象等不会在进程/工作人员之间共享。在这种情况下,您应该考虑使用数据库存储或键值存储(缓存),如此处此处所述。此外,请注意“如果您在代码中消耗大量内存,则每个进程将消耗等量的内存”。

    \n
  10. \n
  11. 如果您需要执行繁重的后台计算,并且不一定需要它由同一进程运行(例如,您不需要共享内存、变量等),那么您可能会受益于使用其他更大的计算像Celery这样的工具,如FastAPI 文档中所述。

    \n
  12. \n
\n

  • 可惜的是,在fastAPI官方文档中却找不到这么好的答案,官方文档并没有把这些描述清楚。这将为很多人节省大量时间 (3认同)

归档时间:

查看次数:

42598 次

最近记录:

1 年,8 月 前