如何在指定时间后停止执行 FastAPI 端点以减少 CPU 资源使用/成本?

Geo*_*eRF 4 python timeout fastapi

使用案例

调用 的客户端微服务/do_something在 request/post() 调用中超时为 60 秒。此超时是固定的且无法更改。因此,如果/do_something需要 10 分钟,/do_something就会浪费 CPU 资源,因为客户端微服务在 60 秒后不会等待 的响应/do_something,这会浪费 CPU 10 分钟,从而增加成本。我们的预算有限。

当前的代码如下所示:

import time
from uvicorn import Server, Config
from random import randrange
from fastapi import FastAPI

app = FastAPI()

def some_func(text):
    """
    Some computationally heavy function
    whose execution time depends on input text size
    """
    randinteger = randrange(1,120)
    time.sleep(randinteger)# simulate processing of text
    return text


@app.get("/do_something")
async def do_something():
    response = some_func(text="hello world")
    return {"response": response}

# Running
if __name__ == '__main__':
    server = Server(Config(app=app, host='0.0.0.0', port=3001))
    server.run()
Run Code Online (Sandbox Code Playgroud)

所需的解决方案

  1. 这里/do_something应该在 60 秒后停止处理当前对端点的请求,并等待下一个请求处理。

  2. 如果端点的执行在 60 秒后强制停止,我们应该能够使用自定义消息记录它。

  3. 这不应该终止服务并与多线程/多处理一起使用。

我试过这个。但是当超时发生时,服务器就会被杀死。有什么解决方案可以解决这个问题吗?

import logging
import time
import timeout_decorator
from uvicorn import Server, Config
from random import randrange
from fastapi import FastAPI

app = FastAPI()


@timeout_decorator.timeout(seconds=2, timeout_exception=StopIteration, use_signals=False)
def some_func(text):
    """
    Some computationally heavy function
    whose execution time depends on input text size
    """
    randinteger = randrange(1,30)
    time.sleep(randinteger)# simulate processing of text
    return text


@app.get("/do_something")
async def do_something():
    try:
        response = some_func(text="hello world")
    except StopIteration:
        logging.warning(f'Stopped /do_something > endpoint due to timeout!')
    else:
        logging.info(f'(  Completed < /do_something > endpoint')

    return {"response": response}


# Running 
if __name__ == '__main__':
    server = Server(Config(app=app, host='0.0.0.0', port=3001))
    server.run()
Run Code Online (Sandbox Code Playgroud)

Chr*_*ris 6

这个答案不是关于改善CPU时间\xe2\x80\x94,正如您在评论部分\xe2\x80\x94中提到的那样,而是解释如果您使用正常def或定义端点会发生什么async def,并在运行阻塞时提供解决方案端点内的操作。

\n

您询问如何在一段时间后停止处理请求,以便处理进一步的请求。开始处理请求,然后(60 秒后)停止它,就好像它从未发生过一样(一直浪费服务器资源并让其他请求等待),这实际上没有意义。相反,您应该让 FastAPI 框架本身来处理请求。当您使用 定义端点时,它会在主线程上运行(在事件循环中),即,只要端点内部没有调用,async def服务器就会按顺序处理请求(就像您的情况一样)。该关键字将函数控制传递回事件循环。换句话说,它暂停周围协程的执行,并告诉事件循环让其他东西运行,直到ed 任务完成(并返回结果数据)。该关键字仅在函数内有效。awaitawaitawaitawaitasync

\n

由于您在端点内执行繁重的 CPU 密集型操作async def(通过调用您的some_func()函数),并且您永远不会放弃对事件循环中运行的其他请求的控制(例如,通过await某些协程),因此服务器将被阻止并且等待该请求完全处理并完成,然后再继续下一个\xe2\x80\x94查看此答案以获取更多详细信息。

\n

解决方案

\n

一种解决方案是使用正常def而不是定义端点async def。简而言之,当您使用普通def而不是async def在 FastAPI 中声明端点时,它会在外部线程池中运行,然后await进行编辑,而不是直接调用(因为它会阻塞服务器);因此,FastAPI 仍然会异步工作

\n

另一个解决方案,如这个答案中所述,是保留async def定义并在单独的线程中运行CPU密集型操作,并await使用Starlette run_in_threadpool(),从而确保运行协程的主线程(事件循环),不会被阻止。正如 @tiangolo此处所述,“run_in_threadpool是一个可等待函数,第一个参数是普通函数,下一个参数直接传递给该函数。它支持序列参数和关键字参数”。例子:

\n
from fastapi.concurrency import run_in_threadpool\n\nres = await run_in_threadpool(cpu_bound_task, text=\'Hello world\')\n
Run Code Online (Sandbox Code Playgroud)\n

由于这是关于 CPU 密集型操作,因此最好使用 ,在单独的进程中运行它ProcessPoolExecutor,如上面提供的链接中所述。在这种情况下,可以将其与 集成asyncio,以便await流程完成其工作并返回结果。请注意,如上面的链接所述,保护代码的主循环以避免递归生成子进程等非常重要\xe2\x80\x94 本质上,您的代码必须位于if __name__ == \'__main__\'. 例子:

\n
from fastapi.concurrency import run_in_threadpool\n\nres = await run_in_threadpool(cpu_bound_task, text=\'Hello world\')\n
Run Code Online (Sandbox Code Playgroud)\n

关于请求超时

\n

关于您关于客户端具有固定 60 秒请求超时的问题的最新更新;如果您没有使用允许您设置请求超时的代理(例如 Nginx),和/或您没有使用gunicorn(它也允许您调整请求),timeout您可以使用中间件(如此处建议的那样)为所有传入请求设置超时。建议的中间件(示例如下)使用 asyncio.wait_for()函数,该函数等待可等待函数/协程完成并超时。如果发生超时,它将取消任务并引发asyncio.TimeoutError

\n

关于您的评论如下:

\n
\n

我的要求不是解锁下一个请求......

\n
\n

再次,请仔细阅读这个答案的第一部分,以了解如果您使用async def而不是await为内部的某些协程定义端点,而是执行一些 CPU 密集型任务(就像您已经做的那样),它将阻塞服务器直到完成(甚至下面的方法也不会按预期工作)。这就像说您希望 FastAPI 一次处理一个请求;在这种情况下,没有理由使用ASGI框架(例如 FastAPI),它利用async/await语法(即异步处理请求)来提供快速性能。因此,您需要async从端点中删除定义(如前所述),或者最好使用 运行同步CPU 密集型任务ProcessPoolExecutor,如前所述。

\n

另外,您的评论some_func()

\n
\n

一些计算量大的函数,其执行时间取决于\n输入文本大小

\n
\n

表示您可以检查输入文本的长度(例如,使用依赖函数HTTPException),并在文本长度超过某个预定义值的情况下引发,而不是(或同时)设置请求超时,这事先已知需要60s以上才能完成处理。这样,您的系统就不会浪费资源来尝试执行您已经知道将无法完成的任务。

\n

工作示例

\n
import concurrent.futures\nfrom functools import partial\nimport asyncio\n\nloop = asyncio.get_running_loop()\nwith concurrent.futures.ProcessPoolExecutor() as pool:\n    res = await loop.run_in_executor(pool, partial(cpu_bound_task, text=\'Hello world\'))\n
Run Code Online (Sandbox Code Playgroud)\n

  • 谢谢。但我的用例/要求与您描述的不同。我的要求不是解锁下一个请求,而是在客户端超时时通过停止端点处理来降低成本。我编辑了原文的第一段以更好地解释它。 (3认同)