标签: asyncpg

如何在 FastAPI 中进行持久的数据库连接?

我正在用 FastAPI 编写我的第一个项目,但我有点挣扎。特别是,我不确定我应该如何在我的应用程序中使用 asyncpg 连接池。目前我的情况是这样的

在 db.py 我有

pgpool = None


async def get_pool():
    global pgpool
    if not pgpool:
        pgpool = await asyncpg.create_pool(dsn='MYDB_DSN')
    return pgpool
Run Code Online (Sandbox Code Playgroud)

然后在单个文件中我使用 get_pool 作为依赖项。

@router.post("/user/", response_model=models.User, status_code=201)
async def create_user(user: models.UserCreate, pgpool = Depends(get_pool)):
    # ... do things ...
Run Code Online (Sandbox Code Playgroud)

首先,我拥有的每个端点都使用数据库,因此为每个函数添加依赖参数似乎很愚蠢。其次,这似乎是一种迂回的做事方式。我定义了一个全局,然后我定义了一个返回该全局的函数,然后我注入了该函数。我相信有更自然的方式来解决它。

我看到有人建议将我需要的任何内容作为属性添加到应用程序对象

@app.on_event("startup")
async def startup():
    app.pool = await asyncpg.create_pool(dsn='MYDB_DSN')
Run Code Online (Sandbox Code Playgroud)

但是当我有多个带有路由器的文件时它不起作用,我不知道如何从路由器对象访问应用程序对象。

我错过了什么?

python asyncpg fastapi

7
推荐指数
1
解决办法
6042
查看次数

使用asyncpg插入多行的最佳方法

我想插入多行并使用asyncpg获取ID,我发现了两种方法:1:生成这样的sql

INSERT INTO films (code, title, did, date_prod, kind) VALUES
    ('B6717', 'Tampopo', 110, '1985-02-10', 'Comedy'),
    ('HG120', 'The Dinner Game', 140, DEFAULT, 'Comedy')
RETURNING id;
Run Code Online (Sandbox Code Playgroud)

2:在for循环中使用prepared语句

values =(('B6717', 'Tampopo', 110, '1985-02-10', 'Comedy'),
        ('HG120', 'The Dinner Game', 140, DEFAULT, 'Comedy'))
stmnt = connection.prepare("INSERT INTO films (code, title, did, date_prod, kind) VALUES $1, $2, $3, $4, $5  RETURNING id")
for val in values:
    stmnt.fetchval(*val)
Run Code Online (Sandbox Code Playgroud)

哪种方式我必须更喜欢100x次700,000行,或者有一些方法来结合这些方法?我完全是绿色的,所以在我身上扔一些烤肉

python postgresql python-asyncio asyncpg

6
推荐指数
3
解决办法
3820
查看次数

asyncpg。将记录转换为 JSON 的正确方法是什么

我有一些方法负责通过 id 从某个表中获取数据。该数据采用字符串格式。我需要将它们转换为 json。

async def my_async_method():
    conn = await asyncpg.connect(**db_conf)
    row = await conn.fetchrow(
        'SELECT database.schema.table.some_table '
        'FROM database.schema.some_table'
        'WHERE database.schema.some_table.id = $1')
    import_transaction = json.loads(row[0])
await conn.close()

loop = asyncio.get_event_loop()
loop.run_until_complete(my_async_method())
Run Code Online (Sandbox Code Playgroud)

在使用 asyncpg 的情况下,将数据从字符串转换为 json 的正确方法是什么?我将不胜感激您的帮助。

python postgresql asyncpg

6
推荐指数
1
解决办法
8527
查看次数

Asyncpg和AWS Lambda

我正在尝试将asyncpg与AWS Lambda一起使用,并在尝试时遇到下一个错误 import asyncpg

Unable to import module 'handler': No module named asyncpg.protocol.protocol'
Run Code Online (Sandbox Code Playgroud)

我在这个答案中导入了python依赖项,无论是否有虚拟环境.

UPD.发现这个使用自定义编译的repo psycopg2适用于AWS Lambda,但是asyncpg没有找到关于编译Lambda友好asyncpg包的替代或指令.

python amazon-web-services aws-lambda asyncpg

5
推荐指数
1
解决办法
412
查看次数

asyncpg 错误:Heroku 中“没有主机的 pg_hba.conf 条目”

我正在使用 asyncpg 连接 Heroku postgresql 中的数据库,使用 python:

import asyncpg

async def create_db_pool():
   bot.pg_con = await asyncpg.create_pool(dsn="postgres://....", host="....amazonaws.com", user="xxx", database="yyy", port="5432", password="12345")
Run Code Online (Sandbox Code Playgroud)

它一直工作得很好,直到我收到了来自 heroku 的一封电子邮件,建议我进行维护:
Maintenance (DATABASE_URL on myappname) is starting now. We will update you when it has completed.

然后就出现了这个错误:

asyncpg.exceptions.InvalidAuthorizationSpecificationError: no pg_hba.conf entry for host "123.456.789.10", user "xxx", database "yyy", SSL off
Run Code Online (Sandbox Code Playgroud)

我尝试遵循一些帮助,例如 putssl=True 但出现此错误:

ssl.SSLCertVerificationError: [SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: self signed certificate (_ssl.c:1108)
Run Code Online (Sandbox Code Playgroud)

和放一样ssl="allow"

asyncpg.exceptions.InvalidPasswordError: password authentication failed for user "xxx"
Run Code Online (Sandbox Code Playgroud)

我能做什么来解决这个问题?

python ssl heroku asyncpg discord.py

5
推荐指数
1
解决办法
2026
查看次数

如何在 Tortoise-ORM 中使用 Postgresql 数组字段

几乎如标题所示,我正在尝试设置 Tortoise-ORM 模型,其中包含与 Postgresql 数组列相对应的字段。

似乎要正确地做到这一点,我需要从 asyncpg (因为它具有完整的数组支持)向上扩展 Tortoise Field 进行构建。然而,我刚刚开始使用乌龟,也许有一些更好/更简单的方法/有人已经做了类似的事情。

python postgresql asyncpg tortoise-orm

5
推荐指数
1
解决办法
3195
查看次数

SqlAlchemy 1.4 抛出 InternalServerError(缓存查找失败,类型 3912040)

我正在使用 SqlAlchemy 1.4.18(异步),我相信我遇到了无法解释的竞争条件。底层数据库是Postgresasyncpg由SQLAlchemy的内部使用。

我在 SQL Alchemy Core 中有以下插入函数。

async def create_device(
    device_id: str,
    device_type: DeviceType,
    account_type: AccountType = AccountType.FREE,
    expires_at: Optional[datetime] = None,
    account_id: Optional[int] = None,
    is_banned: bool = False,
    last_login_at: Optional[datetime] = None,
) -> datetime:
    if expires_at is None:
        expires_at = datetime.utcnow().replace(second=0, microsecond=0) + timedelta(
            days=7
        )
    async with engine.begin() as conn:
        await conn.execute(
            DeviceTable.insert().values(
                id=device_id,
                type=device_type,
                expires_at=expires_at,
                account_type=account_type,
                account_id=account_id,
                is_banned=is_banned,
                last_login_at=last_login_at,
            ),
        )
        return expires_at
Run Code Online (Sandbox Code Playgroud)

单元测试自行成功运行。但是,当我在测试类中运行所有测试时,此测试每次都会失败。

@pytest.mark.asyncio
    @patch("service.email_service.EmailService.confirm_token")
    async def test_confirm_email_already_confirmed(self, mock_token, …
Run Code Online (Sandbox Code Playgroud)

python postgresql sqlalchemy asyncpg

5
推荐指数
1
解决办法
126
查看次数

FastApi 中带有 asyncpg 的大量资源警告

我有一个带有异步 sqlalchemy 的异步 FastApi 应用程序,源代码:

数据库.py

from sqlalchemy import (
    Column,
    String,
)
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm.decl_api import DeclarativeMeta

from app.config import settings


engine = create_async_engine(settings.DATABASE_URL, pool_per_ping=True)
Base: DeclarativeMeta = declarative_base()
async_session = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)


class Titles(Base):
    __tablename__ = "titles"
    id = Column(String(100), primary_key=True)
    title = Column(String(100), unique=True)


async def get_session() -> AsyncSession:
    async with async_session() as session:
        yield session
Run Code Online (Sandbox Code Playgroud)

路由器.py

import .database
from fastapi_utils.cbv import cbv
from …
Run Code Online (Sandbox Code Playgroud)

python-asyncio asyncpg fastapi

5
推荐指数
1
解决办法
1145
查看次数

如何处理 Sqlalchemy 中的 asyncpg.exceptions.TooManyConnectionsError

我正在开发一个使用异步 SQLAlchemy 连接asyncpg到 PostgreSQL 数据库的项目。问题是这样的:当我与数据库建立太多连接时,它会引发以下异常:

asyncpg.exceptions.TooManyConnectionsError: sorry, too many clients already
Run Code Online (Sandbox Code Playgroud)

这基本上是 Postgres 本身和配置的限制。但遗憾的是,会话无法处理此问题,例如尝试多次连接。是否有任何解决方法可以使其无异常地工作?

这是设置:

asyncpg.exceptions.TooManyConnectionsError: sorry, too many clients already
Run Code Online (Sandbox Code Playgroud)

postgresql sqlalchemy exception python-asyncio asyncpg

5
推荐指数
1
解决办法
1522
查看次数

如何在 AsyncPG 中返回多行?

假设我有一个具有以下格式的表:

最喜欢的食物 最喜欢的饮料
比萨
冰淇淋 橙汁
比萨 牛奶

我如何pizza使用Python返回两行作为最喜欢的食物?

我已经fetchrow实现了其他功能,但当然它只适用于一行。

理想情况下,它们将被分类到字典中(例如{favoritefood: pizza, favoritedrink: water:)

python sql asyncpg

4
推荐指数
1
解决办法
4851
查看次数