我已经使用异步实现了所有路线。并遵循 FastAPI 文档中的所有准则。
每个路由都有多个数据库调用,没有异步支持,所以它们是这样的正常功能
def db_fetch(query):
# I take a few seconds to respond
return
Run Code Online (Sandbox Code Playgroud)
为了避免阻塞我的事件循环,我使用fastapi.concurrancy.run_in_threadpool
现在的问题是,当大量请求到来时,我的新请求就会被阻止。即使我关闭浏览器选项卡(取消请求),整个应用程序也会卡住,直到处理旧的请求为止。
我在这里做错了什么?
我用它uvicorn作为我的 ASGI 服务器。我在具有 2 个副本的 kubernetes 集群中运行。
很少有人怀疑:我是否产生了太多线程?这是 uvicron 中的一些错误吗?不太确定!
我可能没有正确理解 FastAPI 中的异步概念。
我同时从两个客户端访问以下应用程序的根端点。我希望 FastAPIStarted在执行开始时连续打印两次:
from fastapi import FastAPI
import asyncio
app = FastAPI()
@app.get("/")
async def read_root():
print('Started')
await asyncio.sleep(5)
print('Finished')
return {"Hello": "World"}
Run Code Online (Sandbox Code Playgroud)
相反,我得到以下内容,它看起来非常非异步:
Started
Finished
INFO: ('127.0.0.1', 49655) - "GET / HTTP/1.1" 200
Started
Finished
INFO: ('127.0.0.1', 49655) - "GET / HTTP/1.1" 200
Run Code Online (Sandbox Code Playgroud)
我错过了什么?
这是我的小fastapi应用程序:
from datetime import datetime
import asyncio
import uvicorn
from fastapi import FastAPI
app = FastAPI()
@app.get("/delayed")
async def get_delayed():
started = datetime.now()
print(f"Starting at: {started}")
await asyncio.sleep(10)
ended = datetime.now()
print(f"Ending at: {ended}")
return {"started": f"{started}", "ended": f"{ended}"}
if __name__ == "__main__":
uvicorn.run("fastapitest.main:app", host="0.0.0.0", port=8000, reload=True, workers=2)
Run Code Online (Sandbox Code Playgroud)
当我连续两次调用它时,第二个函数中的代码直到第一个请求完成后才开始执行,产生如下输出:
Starting at: 2021-09-17 14:52:40.317915
Ending at: 2021-09-17 14:52:50.321557
INFO: 127.0.0.1:58539 - "GET /delayed HTTP/1.1" 200 OK
Starting at: 2021-09-17 14:52:50.328359
Ending at: 2021-09-17 14:53:00.333032
INFO: 127.0.0.1:58539 - "GET /delayed …Run Code Online (Sandbox Code Playgroud) 我正在使用 EC2 AWS 实例中托管的 FastAPI。
EC2 实例规格(g4dn.xlarge):16 GB 内存,4 个 CPU,GPU:NVIDIA T4
我正在对该应用程序进行压力测试。如果我向应用程序同时发送 10 个 POST 请求,则只有 5 个请求同时(完全相同的时间)得到处理,而其他请求则在不同的时间(几秒钟后)一一处理。
配置如下
gunicorn main:app --workers 9 --worker-class uvicorn.workers.UvicornWorker --bind 0.0.0.0:80
Run Code Online (Sandbox Code Playgroud)
workers根据这个公式,我已将 9 设置为 9The suggested maximum concurrent requests when using workers and threads is (2*CPU)+1.
鉴于上述配置,我希望能够同时处理所有 10 个请求。
对于 9 个工作人员,即使我清理了缓存,我也会遇到 CUDA 内存不足错误。
有 3 个工作人员时,我不会遇到 CUDA 问题,但同时处理的请求只有 3 个,仅此而已。其余的将在几秒钟后一一处理。
为什么向 Fastapi 函数添加 async 会出现“‘coroutine’对象不可迭代”错误
当我使用 Swagger UI 调用函数/端点时,只有在函数前面添加 async 关键字后才会出现错误,如下所示:
@router.post("/create")
async def job_create_post_view(
request: Request,
is_htmx=Depends(is_htmx),
db:Session=Depends(get_db),
short_description: str = Form(default=None),
long_description: str = Form(default=None),
.....
job_image:Optional[UploadFile]=File(...)
):
Run Code Online (Sandbox Code Playgroud)
错误如下:
[TypeError("'coroutine' object is not iterable"), TypeError('vars() argument must have __dict__ attribute')]
Run Code Online (Sandbox Code Playgroud)
我试图在函数内做一些异步的事情:
contents = await job_image.read()
Run Code Online (Sandbox Code Playgroud)
这是堆栈跟踪:
Traceback (most recent call last):
File "D:\TEMP\job_search - revert\venv\lib\site-packages\uvicorn\protocols\http\h11_impl.py", line 366, in run_asgi
result = await app(self.scope, self.receive, self.send)
File "D:\TEMP\job_search - revert\venv\lib\site-packages\uvicorn\middleware\proxy_headers.py", line 75, in __call__
return await self.app(scope, receive, send) …Run Code Online (Sandbox Code Playgroud) 当 FastAPI 端点发生异常时,我尝试使用后台任务生成日志:
from fastapi import BackgroundTasks, FastAPI
app = FastAPI()
def write_notification(message=""):
with open("log.txt", mode="w") as email_file:
content = f"{message}"
email_file.write(content)
@app.post("/send-notification/{email}")
async def send_notification(email: str, background_tasks: BackgroundTasks):
if "hello" in email:
background_tasks.add_task(write_notification, message="helloworld")
raise HTTPException(status_code=500, detail="example error")
background_tasks.add_task(write_notification, message="hello world.")
return {"message": "Notification sent in the background"}
Run Code Online (Sandbox Code Playgroud)
但是,不会生成日志,因为根据此处和此处的文档,后台任务“仅”在return执行语句后运行。
有什么解决方法吗?
在FastAPI中,run_in_executor两者run_in_threadpool都可以让函数在其他线程中运行,并且似乎具有相同的行为。
但这有什么区别呢?使用 FastAPI 的最佳选择是什么?
演示:
import asyncio
import time
from fastapi import FastAPI
from fastapi.concurrency import run_in_threadpool
app = FastAPI()
@app.get("/")
async def index():
result = await long_time_work()
result = await long_time_work2()
return {"message": result}
async def long_time_work():
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, time.sleep, 5)
return True
async def long_time_work2():
await run_in_threadpool(lambda: time.sleep(5))
return True
Run Code Online (Sandbox Code Playgroud) 我试图了解新的异步协同程序(在Python 3.5中引入).
1997年,我参加了大学课程,大致涵盖了Andrew Tanenbaum 所着的" 现代操作系统 "一书的内容.
不知何故await,Python3让我想起了Cooperative Multitasking.
来自维基百科:
协作式多任务处理,也称为非抢占式多任务处理,是一种计算机多任务处理方式,其中操作系统永远不会启动从正在运行的进程到另一个进程的上下文切换.相反,进程会定期或在空闲时自动产生控制,以便能够同时运行多个应用程序.这种类型的多任务处理称为"协作",因为所有程序必须合作才能使整个调度方案起作用.
如果你像操作系统一样看Python解释器,"合作多任务"一词是否适用于await?
但可能是我错过了一些东西.
好的,所以我正在对 进行比较研究using ASYNC vs without using ASYNC in FastAPI。然而,我得到了一些意想不到的结果,并且不明白为什么。
import uvicorn
from fastapi import FastAPI
import PIL.Image as Image
import requests
from loguru import logger
import sys
log_format = "{level} {process}-{thread} {time} {name}:{line} - {message}"
logger.remove()
logger.add(sys.stderr, format=log_format, backtrace=True, diagnose=True)
logger.add("logs/" + "t_{time}.log", format=log_format, colorize=True, backtrace=True, diagnose=True)
Image.MAX_IMAGE_PIXELS = None
def get_the_image_from_net():
a = requests.get("https://eoimages.gsfc.nasa.gov/images/imagerecords/73000/73751/world.topo.bathy.200407.3x21600x21600.A1.jpg")
return True
app = FastAPI()
@app.get("/expectoPatronum")
def get_image_of_the_new_world():
"""
Gets Image of the World
"""
logger.info("Received request for getting …Run Code Online (Sandbox Code Playgroud) 我有一个 API 端点(FastAPI / Uvicorn)。除此之外,它还向另一个 API 请求信息。当我使用多个并发请求加载 API 时,我开始收到以下错误:
h11._util.LocalProtocolError: can't handle event type ConnectionClosed when role=SERVER and state=SEND_RESPONSE
Run Code Online (Sandbox Code Playgroud)
在正常环境中,我会利用request.session,但我知道它不是完全线程安全的。
因此,在 FastAPI 等框架内使用请求的正确方法是什么,其中多个线程将requests同时使用该库?
fastapi ×9
python ×8
starlette ×3
uvicorn ×3
python-3.x ×2
async-await ×1
asynchronous ×1
gunicorn ×1
logging ×1
python-anyio ×1