我正在用 FastAPI 编写我的第一个项目,但我有点挣扎。特别是,我不确定我应该如何在我的应用程序中使用 asyncpg 连接池。目前我的情况是这样的
在 db.py 我有
pgpool = None
async def get_pool():
global pgpool
if not pgpool:
pgpool = await asyncpg.create_pool(dsn='MYDB_DSN')
return pgpool
Run Code Online (Sandbox Code Playgroud)
然后在单个文件中我使用 get_pool 作为依赖项。
@router.post("/user/", response_model=models.User, status_code=201)
async def create_user(user: models.UserCreate, pgpool = Depends(get_pool)):
# ... do things ...
Run Code Online (Sandbox Code Playgroud)
首先,我拥有的每个端点都使用数据库,因此为每个函数添加依赖参数似乎很愚蠢。其次,这似乎是一种迂回的做事方式。我定义了一个全局,然后我定义了一个返回该全局的函数,然后我注入了该函数。我相信有更自然的方式来解决它。
我看到有人建议将我需要的任何内容作为属性添加到应用程序对象
@app.on_event("startup")
async def startup():
app.pool = await asyncpg.create_pool(dsn='MYDB_DSN')
Run Code Online (Sandbox Code Playgroud)
但是当我有多个带有路由器的文件时它不起作用,我不知道如何从路由器对象访问应用程序对象。
我错过了什么?
我想插入多行并使用asyncpg获取ID,我发现了两种方法:1:生成这样的sql
INSERT INTO films (code, title, did, date_prod, kind) VALUES
('B6717', 'Tampopo', 110, '1985-02-10', 'Comedy'),
('HG120', 'The Dinner Game', 140, DEFAULT, 'Comedy')
RETURNING id;
Run Code Online (Sandbox Code Playgroud)
2:在for循环中使用prepared语句
values =(('B6717', 'Tampopo', 110, '1985-02-10', 'Comedy'),
('HG120', 'The Dinner Game', 140, DEFAULT, 'Comedy'))
stmnt = connection.prepare("INSERT INTO films (code, title, did, date_prod, kind) VALUES $1, $2, $3, $4, $5 RETURNING id")
for val in values:
stmnt.fetchval(*val)
Run Code Online (Sandbox Code Playgroud)
哪种方式我必须更喜欢100x次700,000行,或者有一些方法来结合这些方法?我完全是绿色的,所以在我身上扔一些烤肉
我有一些方法负责通过 id 从某个表中获取数据。该数据采用字符串格式。我需要将它们转换为 json。
async def my_async_method():
conn = await asyncpg.connect(**db_conf)
row = await conn.fetchrow(
'SELECT database.schema.table.some_table '
'FROM database.schema.some_table'
'WHERE database.schema.some_table.id = $1')
import_transaction = json.loads(row[0])
await conn.close()
loop = asyncio.get_event_loop()
loop.run_until_complete(my_async_method())
Run Code Online (Sandbox Code Playgroud)
在使用 asyncpg 的情况下,将数据从字符串转换为 json 的正确方法是什么?我将不胜感激您的帮助。
我正在尝试将asyncpg与AWS Lambda一起使用,并在尝试时遇到下一个错误 import asyncpg
Unable to import module 'handler': No module named asyncpg.protocol.protocol'
Run Code Online (Sandbox Code Playgroud)
我在这个答案中导入了python依赖项,无论是否有虚拟环境.
UPD.发现这个使用自定义编译的repo psycopg2适用于AWS Lambda,但是asyncpg没有找到关于编译Lambda友好asyncpg包的替代或指令.
我正在使用 asyncpg 连接 Heroku postgresql 中的数据库,使用 python:
import asyncpg
async def create_db_pool():
bot.pg_con = await asyncpg.create_pool(dsn="postgres://....", host="....amazonaws.com", user="xxx", database="yyy", port="5432", password="12345")
Run Code Online (Sandbox Code Playgroud)
它一直工作得很好,直到我收到了来自 heroku 的一封电子邮件,建议我进行维护:
Maintenance (DATABASE_URL on myappname) is starting now. We will update you when it has completed.
然后就出现了这个错误:
asyncpg.exceptions.InvalidAuthorizationSpecificationError: no pg_hba.conf entry for host "123.456.789.10", user "xxx", database "yyy", SSL off
Run Code Online (Sandbox Code Playgroud)
我尝试遵循一些帮助,例如 putssl=True
但出现此错误:
ssl.SSLCertVerificationError: [SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: self signed certificate (_ssl.c:1108)
Run Code Online (Sandbox Code Playgroud)
和放一样ssl="allow"
asyncpg.exceptions.InvalidPasswordError: password authentication failed for user "xxx"
Run Code Online (Sandbox Code Playgroud)
我能做什么来解决这个问题?
几乎如标题所示,我正在尝试设置 Tortoise-ORM 模型,其中包含与 Postgresql 数组列相对应的字段。
似乎要正确地做到这一点,我需要从 asyncpg (因为它具有完整的数组支持)向上扩展 Tortoise Field 进行构建。然而,我刚刚开始使用乌龟,也许有一些更好/更简单的方法/有人已经做了类似的事情。
我正在使用 SqlAlchemy 1.4.18(异步),我相信我遇到了无法解释的竞争条件。底层数据库是Postgres与asyncpg由SQLAlchemy的内部使用。
我在 SQL Alchemy Core 中有以下插入函数。
async def create_device(
device_id: str,
device_type: DeviceType,
account_type: AccountType = AccountType.FREE,
expires_at: Optional[datetime] = None,
account_id: Optional[int] = None,
is_banned: bool = False,
last_login_at: Optional[datetime] = None,
) -> datetime:
if expires_at is None:
expires_at = datetime.utcnow().replace(second=0, microsecond=0) + timedelta(
days=7
)
async with engine.begin() as conn:
await conn.execute(
DeviceTable.insert().values(
id=device_id,
type=device_type,
expires_at=expires_at,
account_type=account_type,
account_id=account_id,
is_banned=is_banned,
last_login_at=last_login_at,
),
)
return expires_at
Run Code Online (Sandbox Code Playgroud)
单元测试自行成功运行。但是,当我在测试类中运行所有测试时,此测试每次都会失败。
@pytest.mark.asyncio
@patch("service.email_service.EmailService.confirm_token")
async def test_confirm_email_already_confirmed(self, mock_token, …Run Code Online (Sandbox Code Playgroud) 我有一个带有异步 sqlalchemy 的异步 FastApi 应用程序,源代码:
from sqlalchemy import (
Column,
String,
)
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm.decl_api import DeclarativeMeta
from app.config import settings
engine = create_async_engine(settings.DATABASE_URL, pool_per_ping=True)
Base: DeclarativeMeta = declarative_base()
async_session = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
class Titles(Base):
__tablename__ = "titles"
id = Column(String(100), primary_key=True)
title = Column(String(100), unique=True)
async def get_session() -> AsyncSession:
async with async_session() as session:
yield session
Run Code Online (Sandbox Code Playgroud)
import .database
from fastapi_utils.cbv import cbv
from …Run Code Online (Sandbox Code Playgroud) 我正在开发一个使用异步 SQLAlchemy 连接asyncpg到 PostgreSQL 数据库的项目。问题是这样的:当我与数据库建立太多连接时,它会引发以下异常:
asyncpg.exceptions.TooManyConnectionsError: sorry, too many clients already
Run Code Online (Sandbox Code Playgroud)
这基本上是 Postgres 本身和配置的限制。但遗憾的是,会话无法处理此问题,例如尝试多次连接。是否有任何解决方法可以使其无异常地工作?
这是设置:
asyncpg.exceptions.TooManyConnectionsError: sorry, too many clients already
Run Code Online (Sandbox Code Playgroud) 假设我有一个具有以下格式的表:
| 最喜欢的食物 | 最喜欢的饮料 |
|---|---|
| 比萨 | 水 |
| 冰淇淋 | 橙汁 |
| 比萨 | 牛奶 |
我如何pizza使用Python返回两行作为最喜欢的食物?
我已经fetchrow实现了其他功能,但当然它只适用于一行。
理想情况下,它们将被分类到字典中(例如{favoritefood: pizza, favoritedrink: water:)
asyncpg ×10
python ×8
postgresql ×5
fastapi ×2
sqlalchemy ×2
aws-lambda ×1
discord.py ×1
exception ×1
heroku ×1
sql ×1
ssl ×1
tortoise-orm ×1