Sid*_*Sid 6 python python-multithreading connection-pool fastapi
我将 FastAPI 用于生产应用程序,该应用程序几乎完全使用 asyncio,除非访问数据库。该数据库仍然依赖于同步 SQLAlchemy,因为异步版本当时仍处于 alpha(或早期 beta)状态。
虽然我们的服务最终会在访问数据库时进行同步阻塞调用,但它仍然包含在异步函数中。我们确实运行了多个工作进程和应用程序的多个实例,以确保我们不会遇到严重的瓶颈。
我知道 FastAPI 在使用该def controller_method方法时使用线程提供并发性,但我似乎找不到有关它如何控制环境的任何细节。有人可以帮助我了解如何控制进程可以生成的最大线程数。如果达到系统限制怎么办?
当我使用异步等待模型时,我在中间件中创建数据库连接对象,并将其注入到控制器操作中。
@app.middleware("http")
async def db_session_middleware(request: Request, call_next):
await _set_request_id()
try:
request.state.db = get_sessionmaker(scope_func=None)
response = await call_next(request)
finally:
if request.state.db.is_active:
request.state.db.close()
return response
Run Code Online (Sandbox Code Playgroud)
当通过线程完成时,控制器是否已经在单独的线程中调用,确保每个请求都有单独的连接?
现在,如果我无法限制主进程生成的线程数量,如果我的应用程序突然收到大量请求,它是否会超出数据库连接池限制并最终阻止我的应用程序?
FastAPI 是否有一个可以配置的中央线程池,或者是由 Uvicorn 控制的?
我看到 Uvicorn 有一个配置,可以使用该--limit-concurrency 60标志来限制并发性。这是控制在线程模式下创建的并发线程的数量吗?
如果是这样,这是否应该始终低于我的连接池(连接池 + max_overflow=40)
那么在这种情况下,我允许 uvicorn 并发限制为 60,我的数据库连接池配置应该是这样的?
engine = sqlalchemy.create_engine(
cfg("DB_URL"),
pool_size=40,
max_overflow=20,
echo=False,
pool_use_lifo=False,
pool_recycle=120
)
Run Code Online (Sandbox Code Playgroud)
在这种情况下是否使用了中央线程池?是否有任何示例项目可供我查看,以了解在大规模部署时如何配置。
我使用 Netflix Dispatch 作为参考,但如果还有其他项目,我肯定想看看。
Fastapi 使用 Starlette 作为底层框架。Starlette 提供了一种在使用anyiodef的线程池中启动路径操作的机制。因此,我们可以通过设置anyio的total_tokens属性来限制可以同时执行的线程数量。 CapacityLimiter
下面的例子:
import threading
import anyio
import uvicorn
from fastapi import FastAPI
import time
import logging
THREADS_LIMIT = 5
logging.basicConfig(level=logging.DEBUG)
app = FastAPI()
class Counter(object):
def __init__(self):
self._value = 0
self._lock = threading.Lock()
def increment(self):
with self._lock:
self._value += 1
def decrement(self):
with self._lock:
self._value -= 1
def value(self):
with self._lock:
return self._value
counter = Counter()
@app.get("/start_task")
def start_task():
counter.increment()
logging.info("Route started. Counter: %d", counter.value())
time.sleep(10)
counter.decrement()
logging.info("Route stopped. Counter: %d", counter.value())
return "Task done"
@app.on_event("startup")
async def startup_event():
limiter = anyio.to_thread.current_default_thread_limiter()
limiter.total_tokens = THREADS_LIMIT
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000, log_level="debug")
Run Code Online (Sandbox Code Playgroud)
尝试并行打开 50 个连接:
seq 1 50 | xargs -n1 -P50 curl "http://localhost:8000/start_task"
Run Code Online (Sandbox Code Playgroud)
我们看到同时处理的请求数量限制为 5。
输出:
INFO:root:Route started. Counter: 1
INFO:root:Route started. Counter: 2
INFO:root:Route started. Counter: 3
INFO:root:Route started. Counter: 4
INFO:root:Route started. Counter: 5
INFO:root:Route stopped. Counter: 4
INFO:uvicorn.access:127.0.0.1:60830 - "GET /start_task HTTP/1.1" 200
INFO:root:Route stopped. Counter: 3
INFO:root:Route started. Counter: 4
INFO:uvicorn.access:127.0.0.1:60832 - "GET /start_task HTTP/1.1" 200
INFO:root:Route started. Counter: 5
...
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
3031 次 |
| 最近记录: |