Geo*_*eRF 4 python timeout fastapi
调用 的客户端微服务/do_something
在 request/post() 调用中超时为 60 秒。此超时是固定的且无法更改。因此,如果/do_something
需要 10 分钟,/do_something
就会浪费 CPU 资源,因为客户端微服务在 60 秒后不会等待 的响应/do_something
,这会浪费 CPU 10 分钟,从而增加成本。我们的预算有限。
当前的代码如下所示:
import time
from uvicorn import Server, Config
from random import randrange
from fastapi import FastAPI
app = FastAPI()
def some_func(text):
"""
Some computationally heavy function
whose execution time depends on input text size
"""
randinteger = randrange(1,120)
time.sleep(randinteger)# simulate processing of text
return text
@app.get("/do_something")
async def do_something():
response = some_func(text="hello world")
return {"response": response}
# Running
if __name__ == '__main__':
server = Server(Config(app=app, host='0.0.0.0', port=3001))
server.run()
Run Code Online (Sandbox Code Playgroud)
这里/do_something
应该在 60 秒后停止处理当前对端点的请求,并等待下一个请求处理。
如果端点的执行在 60 秒后强制停止,我们应该能够使用自定义消息记录它。
这不应该终止服务并与多线程/多处理一起使用。
我试过这个。但是当超时发生时,服务器就会被杀死。有什么解决方案可以解决这个问题吗?
import logging
import time
import timeout_decorator
from uvicorn import Server, Config
from random import randrange
from fastapi import FastAPI
app = FastAPI()
@timeout_decorator.timeout(seconds=2, timeout_exception=StopIteration, use_signals=False)
def some_func(text):
"""
Some computationally heavy function
whose execution time depends on input text size
"""
randinteger = randrange(1,30)
time.sleep(randinteger)# simulate processing of text
return text
@app.get("/do_something")
async def do_something():
try:
response = some_func(text="hello world")
except StopIteration:
logging.warning(f'Stopped /do_something > endpoint due to timeout!')
else:
logging.info(f'( Completed < /do_something > endpoint')
return {"response": response}
# Running
if __name__ == '__main__':
server = Server(Config(app=app, host='0.0.0.0', port=3001))
server.run()
Run Code Online (Sandbox Code Playgroud)
这个答案不是关于改善CPU时间\xe2\x80\x94,正如您在评论部分\xe2\x80\x94中提到的那样,而是解释如果您使用正常def
或定义端点会发生什么async def
,并在运行阻塞时提供解决方案端点内的操作。
您询问如何在一段时间后停止处理请求,以便处理进一步的请求。开始处理请求,然后(60 秒后)停止它,就好像它从未发生过一样(一直浪费服务器资源并让其他请求等待),这实际上没有意义。相反,您应该让 FastAPI 框架本身来处理请求。当您使用 定义端点时,它会在主线程上运行(在事件循环中),即,只要端点内部没有调用,async def
服务器就会按顺序处理请求(就像您的情况一样)。该关键字将函数控制传递回事件循环。换句话说,它暂停周围协程的执行,并告诉事件循环让其他东西运行,直到ed 任务完成(并返回结果数据)。该关键字仅在函数内有效。await
await
await
await
async
由于您在端点内执行繁重的 CPU 密集型操作async def
(通过调用您的some_func()
函数),并且您永远不会放弃对事件循环中运行的其他请求的控制(例如,通过await
某些协程),因此服务器将被阻止并且等待该请求完全处理并完成,然后再继续下一个\xe2\x80\x94查看此答案以获取更多详细信息。
一种解决方案是使用正常def
而不是定义端点async def
。简而言之,当您使用普通def
而不是async def
在 FastAPI 中声明端点时,它会在外部线程池中运行,然后await
进行编辑,而不是直接调用(因为它会阻塞服务器);因此,FastAPI 仍然会异步工作。
另一个解决方案,如这个答案中所述,是保留async def
定义并在单独的线程中运行CPU密集型操作,并await
使用Starlette run_in_threadpool()
,从而确保运行协程的主线程(事件循环),不会被阻止。正如 @tiangolo此处所述,“run_in_threadpool
是一个可等待函数,第一个参数是普通函数,下一个参数直接传递给该函数。它支持序列参数和关键字参数”。例子:
from fastapi.concurrency import run_in_threadpool\n\nres = await run_in_threadpool(cpu_bound_task, text=\'Hello world\')\n
Run Code Online (Sandbox Code Playgroud)\n由于这是关于 CPU 密集型操作,因此最好使用 ,在单独的进程中运行它ProcessPoolExecutor
,如上面提供的链接中所述。在这种情况下,可以将其与 集成asyncio
,以便await
流程完成其工作并返回结果。请注意,如上面的链接所述,保护代码的主循环以避免递归生成子进程等非常重要\xe2\x80\x94 本质上,您的代码必须位于if __name__ == \'__main__\'
. 例子:
from fastapi.concurrency import run_in_threadpool\n\nres = await run_in_threadpool(cpu_bound_task, text=\'Hello world\')\n
Run Code Online (Sandbox Code Playgroud)\n关于您关于客户端具有固定 60 秒请求超时的问题的最新更新;如果您没有使用允许您设置请求超时的代理(例如 Nginx),和/或您没有使用gunicorn(它也允许您调整请求),timeout
您可以使用中间件(如此处建议的那样)为所有传入请求设置超时。建议的中间件(示例如下)使用 asyncio.wait_for()
函数,该函数等待可等待函数/协程完成并超时。如果发生超时,它将取消任务并引发asyncio.TimeoutError
。
关于您的评论如下:
\n\n\n我的要求不是解锁下一个请求......
\n
再次,请仔细阅读这个答案的第一部分,以了解如果您使用async def
而不是await
为内部的某些协程定义端点,而是执行一些 CPU 密集型任务(就像您已经做的那样),它将阻塞服务器直到完成(甚至下面的方法也不会按预期工作)。这就像说您希望 FastAPI 一次处理一个请求;在这种情况下,没有理由使用ASGI框架(例如 FastAPI),它利用async
/await
语法(即异步处理请求)来提供快速性能。因此,您需要async
从端点中删除定义(如前所述),或者最好使用 运行同步CPU 密集型任务ProcessPoolExecutor
,如前所述。
另外,您的评论some_func()
:
\n\n一些计算量大的函数,其执行时间取决于\n输入文本大小
\n
表示您可以检查输入文本的长度(例如,使用依赖函数HTTPException
),并在文本长度超过某个预定义值的情况下引发,而不是(或同时)设置请求超时,这事先已知需要60s以上才能完成处理。这样,您的系统就不会浪费资源来尝试执行您已经知道将无法完成的任务。
import concurrent.futures\nfrom functools import partial\nimport asyncio\n\nloop = asyncio.get_running_loop()\nwith concurrent.futures.ProcessPoolExecutor() as pool:\n res = await loop.run_in_executor(pool, partial(cpu_bound_task, text=\'Hello world\'))\n
Run Code Online (Sandbox Code Playgroud)\n
归档时间: |
|
查看次数: |
6784 次 |
最近记录: |