FastApi 中带有 asyncpg 的大量资源警告

Dmi*_*try 5 python-asyncio asyncpg fastapi

我有一个带有异步 sqlalchemy 的异步 FastApi 应用程序,源代码:

数据库.py

from sqlalchemy import (
    Column,
    String,
)
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm.decl_api import DeclarativeMeta

from app.config import settings


engine = create_async_engine(settings.DATABASE_URL, pool_per_ping=True)
Base: DeclarativeMeta = declarative_base()
async_session = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)


class Titles(Base):
    __tablename__ = "titles"
    id = Column(String(100), primary_key=True)
    title = Column(String(100), unique=True)


async def get_session() -> AsyncSession:
    async with async_session() as session:
        yield session
Run Code Online (Sandbox Code Playgroud)

路由器.py

import .database
from fastapi_utils.cbv import cbv
from fastapi_utils.inferring_router import InferringRouter


router = InferringRouter()


async def get_titles(session: AsyncSession):
    results = await session.execute(select(database.Titles)))
    return results.scalars().all()


@cbv(router)
class TitlesView:
    session: AsyncSession = Depends(database.get_session)

    @router.get("/titles", status_code=HTTP_200_OK)
    async def get(self) -> List[TitlesSchema]:
        results = await get_titles(self.session)
        return [TitlesSchema.from_orm(result) for result in results]
Run Code Online (Sandbox Code Playgroud)

主要.py

from fastapi import FastAPI

from app.routers import router 


def create_app() -> FastAPI:
    fast_api_app = FastAPI()
    fast_api_app.include_router(router, prefix="/", tags=["Titles"])

    return fast_api_app


app = create_app()
Run Code Online (Sandbox Code Playgroud)

管理.py

import asyncio
import sys

from .database import async_session, Base, engine


async def init_models():
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all, checkfirst=True)
    
if __name__ == "__main__":
    asyncio.run(init_models())
    sys.stdout.write("Models initiated\n")
Run Code Online (Sandbox Code Playgroud)

它与 docker 一起运行:

python manage.py
CMD ["uvicorn", "main:app", "--reload", "--host", "0.0.0.0", "--port", "8000", "--limit-max-requests", "10000"]
Run Code Online (Sandbox Code Playgroud)

在我看到消息后Models initiated,在init_models() func我看到几个警告后:

app_1  | Models initiated
app_1  | /usr/local/lib/python3.9/site-packages/asyncpg/connection.py:131: ResourceWarning: unclosed connection <asyncpg.connection.Connection object at 0x7efe5a613c80>; run in asyncio debug mode to show the traceback of connection origin
app_1  | /usr/local/lib/python3.9/asyncio/sslproto.py:320: ResourceWarning: unclosed transport <asyncio.sslproto._SSLProtocolTransport object at 0x7efe5a631700>
app_1  | /usr/local/lib/python3.9/asyncio/selector_events.py:704: ResourceWarning: unclosed transport <_SelectorSocketTransport fd=6>
app_1  | INFO: Uvicorn running on http://0.0.0.0:5000 (Press CTRL+C to quit)
app_1  | INFO: Started reloader process [15] using statreload
app_1  | INFO: Started server process [17]
app_1  | INFO: Waiting for application startup.
app_1  | INFO: Application startup complete.
Run Code Online (Sandbox Code Playgroud)

进行更改后,我看到一堆警告:

app_1   | WARNING: StatReload detected file change in 'ref_info/main.py'. Reloading...
app_1   | INFO: Shutting down
app_1   | INFO: Waiting for application shutdown.
app_1   | INFO: Application shutdown complete.
app_1   | INFO: Finished server process [15]
app_1   | sys:1: ResourceWarning: unclosed file <_io.TextIOWrapper name=0 mode='r' encoding='UTF-8'>
app_1   | INFO: Started server process [16]
app_1   | INFO: Waiting for application startup.
app_1   | INFO: Application startup complete.
Run Code Online (Sandbox Code Playgroud)

可以吗,我需要隐藏它吗?或者我设置错了?

Dmi*_*try 2

好的,我解决了。

engine = create_async_engine(
    settings.DATABASE_ASYNC_URI,
    echo="debug" if settings.DEBUG else False,
)
async_session = sessionmaker(
    bind=engine,
    class_=AsyncSession,
    autoflush=True,
    autocommit=False,
    expire_on_commit=False,
)


async def get_session() -> AsyncGenerator[AsyncSession, None]:
    async with async_session() as session:
        assert isinstance(session, AsyncSession)
        yield session


async def connect() -> None:
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all, checkfirst=True)


async def disconnect() -> None:
    if engine:
        await engine.dispose()
Run Code Online (Sandbox Code Playgroud)

connect和添加disconnect到您的 FastApi 应用程序启动和关闭事件